Ce este Thread-Safe BlockingQueue în Java? Când ar trebui să-l folosești? Implementare atașată
Publicat: 2021-04-30
Până acum am scris două articole despre conceptul Producer Consumer pe Crunchify. Primul pentru a explica Java Semaphore și Mutex și al doilea pentru a explica Concurrent Read/Write.
În acest tutorial Java vom trece peste același concept de producător/consumator pentru a explica BlockingQueue in Java
.
Care sunt avantajele Blocking Queue în Java?
Un java.util.Queue
acceptă operațiuni care așteaptă ca coada să nu devină goală la preluarea unui element și așteaptă ca spațiu să devină disponibil în coadă atunci când stochează un element.

Trebuie să creăm patru clase Java:
- CrunchifyMessage.java pentru a pune și a primi mesaj
- CrunchifyBlockingProducer.java pentru a pune mesajul în coadă
- CrunchifyBlockingConsumer.java pentru a primi mesajul din coadă
- CrunchifyBlockingMain.java pentru a începe testul
Implementările BlockingQueue sunt thread-safe
. Toate metodele de așteptare sunt de natură atomică și folosesc blocări interne.
Să începem cu implementarea Thread-Safe BlockingQueue în Java
Pasul 1
Creați clasa CrunchifyMessage.java
. Acesta este un simplu obiect Java.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com . crunchify . example ; /** * @author Crunchify.com * simple Message class to put and get message into queue */ public class CrunchifyMessage { private String crunchifyMsg ; public CrunchifyMessage ( String string ) { this . crunchifyMsg = string ; } public String getMsg ( ) { return crunchifyMsg ; } } |
Pasul 2
Creați producătorul CrunchifyBlockingProducer.java
care a creat mesaje simple și a pus-o în coadă.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingProducer implements Runnable { private BlockingQueue <CrunchifyMessage> crunchQueue ; public CrunchifyBlockingProducer ( BlockingQueue <CrunchifyMessage> queue ) { this . crunchQueue = queue ; } @Override public void run ( ) { // producing CrunchifyMessage messages for ( int i = 1 ; i < = 5 ; i ++ ) { CrunchifyMessage msg = new CrunchifyMessage ( "i'm msg " + i ) ; try { Thread . sleep ( 10 ) ; crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Message - " + msg . getMsg ( ) + " produced." ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } // adding exit message CrunchifyMessage msg = new CrunchifyMessage ( "All done from Producer side. Produced 50 CrunchifyMessages" ) ; try { crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Exit Message - " + msg . getMsg ( ) ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } } |
Pasul 3
Creați clasa CrunchifyBlockingConsumer.java
care consumă mesajul din coadă.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingConsumer implements Runnable { private BlockingQueue <CrunchifyMessage> queue ; public CrunchifyBlockingConsumer ( BlockingQueue <CrunchifyMessage> queue ) { this . queue = queue ; } @Override public void run ( ) { try { CrunchifyMessage msg ; // consuming messages until exit message is received while ( ( msg = queue . take ( ) ) . getMsg ( ) ! = "exit" ) { Thread . sleep ( 10 ) ; System . out . println ( "CrunchifyBlockingConsumer: Message - " + msg . getMsg ( ) + " consumed." ) ; } } catch ( InterruptedException e ) { e . printStackTrace ( ) ; } } } |
Pasul-4
Creați metoda simplă CrunchifyBlockingMain.java
care rulează testul BlockingQueue. Rulați acest program pentru a verifica comportamentul BlockingQueue.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com . crunchify . example ; import java . util . concurrent . ArrayBlockingQueue ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingMain { public static void main ( String [ ] args ) { // Creating BlockingQueue of size 10 // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and // wait for space to become available in the queue when storing an element. BlockingQueue <CrunchifyMessage> crunchQueue = new ArrayBlockingQueue < > ( 10 ) ; CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer ( crunchQueue ) ; CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer ( crunchQueue ) ; // starting producer to produce messages in queue new Thread ( crunchProducer ) . start ( ) ; // starting consumer to consume messages from queue new Thread ( crunchConsumer ) . start ( ) ; System . out . println ( "Let's get started. Producer / Consumer Test Started.\n" ) ; } } |
Un BlockingQueue nu acceptă elemente nule . Implementările aruncă NullPointerException la încercările de a adăuga , pune sau oferi un null .
O valoare nulă este folosită ca valoare sentinelă pentru a indica eșecul operațiunilor de sondare .
Rezultat:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Let 's get started. Producer / Consumer Test Started. CrunchifyBlockingProducer: Message - i' m msg 1 produced . CrunchifyBlockingProducer : Message - i 'm msg 2 produced. CrunchifyBlockingConsumer: Message - i' m msg 1 consumed . CrunchifyBlockingConsumer : Message - i 'm msg 2 consumed. CrunchifyBlockingProducer: Message - i' m msg 3 produced . CrunchifyBlockingConsumer : Message - i 'm msg 3 consumed. CrunchifyBlockingProducer: Message - i' m msg 4 produced . CrunchifyBlockingConsumer : Message - i 'm msg 4 consumed. CrunchifyBlockingProducer: Message - i' m msg 5 produced . CrunchifyBlockingProducer : Exit Message - All done from Producer side . Produced 50 CrunchifyMessages CrunchifyBlockingConsumer : Message - i ' m msg 5 consumed . CrunchifyBlockingConsumer : Message - All done from Producer side . Produced 50 CrunchifyMessages consumed . |
Când ar trebui să folosim java.util.concurrent.BlockingQueue?
- Când doriți să reduceți un fel de solicitare primită, ar trebui să utilizați același lucru
- Un producător poate ajunge cu mult înaintea consumatorilor cu o coadă nelimitată. Dacă consumatorul nu ajunge din urmă cu producătorul, atunci poate provoca o
OutOfMemoryError
. În situații ca acestea, poate fi mai bine să semnalați unui potențial producător că coada este plină și să renunțe rapid la un eșec.- Cu alte cuvinte: producătorii sunt în mod firesc throttled.
- Coada de blocare este utilizată în mod normal în aplicațiile concurente
- Oferă o implementare corectă, sigură pentru fire
- Consumul de memorie ar trebui, de asemenea, limitat