Was ist Thread-Safe BlockingQueue in Java? Wann sollten Sie es verwenden? Umsetzung beigefügt
Veröffentlicht: 2021-04-30
Bisher habe ich zwei Artikel zum Producer-Consumer-Konzept auf Crunchify geschrieben. Das erste erklärt Java Semaphore und Mutex und das zweite erklärt Concurrent Read/Write.
In diesem Java-Tutorial gehen wir auf dasselbe Producer/Consumer-Konzept ein, um die BlockingQueue in Java
zu erklären.
Was sind die Vorteile der Sperrwarteschlange in Java?
Eine java.util.Queue
unterstützt Operationen, die darauf warten, dass die Warteschlange beim Abrufen eines Elements nicht leer wird, und warten, bis Speicherplatz in der Warteschlange verfügbar wird, wenn ein Element gespeichert wird.

Wir müssen vier Java-Klassen erstellen:
- CrunchifyMessage.java zum Setzen und Abrufen von Nachrichten
- CrunchifyBlockingProducer.java, um die Nachricht in die Warteschlange zu stellen
- CrunchifyBlockingConsumer.java, um Nachrichten aus der Warteschlange abzurufen
- CrunchifyBlockingMain.java, um den Test zu starten
BlockingQueue-Implementierungen sind thread-safe
. Alle Warteschlangenmethoden sind atomarer Natur und verwenden interne Sperren.
Beginnen wir mit der Thread-Safe BlockingQueue-Implementierung in Java
Schritt 1
Erstellen Sie die Klasse CrunchifyMessage.java
. Dies ist ein einfaches Java-Objekt.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com . crunchify . example ; /** * @author Crunchify.com * simple Message class to put and get message into queue */ public class CrunchifyMessage { private String crunchifyMsg ; public CrunchifyMessage ( String string ) { this . crunchifyMsg = string ; } public String getMsg ( ) { return crunchifyMsg ; } } |
Schritt 2
Erstellen Sie den Producer CrunchifyBlockingProducer.java
, der eine einfache msg erstellt und in die Warteschlange gestellt hat.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingProducer implements Runnable { private BlockingQueue <CrunchifyMessage> crunchQueue ; public CrunchifyBlockingProducer ( BlockingQueue <CrunchifyMessage> queue ) { this . crunchQueue = queue ; } @Override public void run ( ) { // producing CrunchifyMessage messages for ( int i = 1 ; i < = 5 ; i ++ ) { CrunchifyMessage msg = new CrunchifyMessage ( "i'm msg " + i ) ; try { Thread . sleep ( 10 ) ; crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Message - " + msg . getMsg ( ) + " produced." ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } // adding exit message CrunchifyMessage msg = new CrunchifyMessage ( "All done from Producer side. Produced 50 CrunchifyMessages" ) ; try { crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Exit Message - " + msg . getMsg ( ) ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } } |
Schritt 3
Erstellen Sie die Klasse CrunchifyBlockingConsumer.java
, die die Nachricht aus der Warteschlange verarbeitet.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingConsumer implements Runnable { private BlockingQueue <CrunchifyMessage> queue ; public CrunchifyBlockingConsumer ( BlockingQueue <CrunchifyMessage> queue ) { this . queue = queue ; } @Override public void run ( ) { try { CrunchifyMessage msg ; // consuming messages until exit message is received while ( ( msg = queue . take ( ) ) . getMsg ( ) ! = "exit" ) { Thread . sleep ( 10 ) ; System . out . println ( "CrunchifyBlockingConsumer: Message - " + msg . getMsg ( ) + " consumed." ) ; } } catch ( InterruptedException e ) { e . printStackTrace ( ) ; } } } |
Schritt 4
Erstellen Sie eine einfache CrunchifyBlockingMain.java
Methode, die den BlockingQueue-Test ausführt. Führen Sie dieses Programm aus, um das Verhalten von BlockingQueue zu überprüfen.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com . crunchify . example ; import java . util . concurrent . ArrayBlockingQueue ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingMain { public static void main ( String [ ] args ) { // Creating BlockingQueue of size 10 // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and // wait for space to become available in the queue when storing an element. BlockingQueue <CrunchifyMessage> crunchQueue = new ArrayBlockingQueue < > ( 10 ) ; CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer ( crunchQueue ) ; CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer ( crunchQueue ) ; // starting producer to produce messages in queue new Thread ( crunchProducer ) . start ( ) ; // starting consumer to consume messages from queue new Thread ( crunchConsumer ) . start ( ) ; System . out . println ( "Let's get started. Producer / Consumer Test Started.\n" ) ; } } |
Eine BlockingQueue akzeptiert keine Null- Elemente. Implementierungen lösen NullPointerException bei Versuchen aus , eine Null hinzuzufügen , einzufügen oder anzubieten .
Eine Null wird als Sentinel-Wert verwendet, um das Fehlschlagen von Abfrageoperationen anzuzeigen.
Ergebnis:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Let 's get started. Producer / Consumer Test Started. CrunchifyBlockingProducer: Message - i' m msg 1 produced . CrunchifyBlockingProducer : Message - i 'm msg 2 produced. CrunchifyBlockingConsumer: Message - i' m msg 1 consumed . CrunchifyBlockingConsumer : Message - i 'm msg 2 consumed. CrunchifyBlockingProducer: Message - i' m msg 3 produced . CrunchifyBlockingConsumer : Message - i 'm msg 3 consumed. CrunchifyBlockingProducer: Message - i' m msg 4 produced . CrunchifyBlockingConsumer : Message - i 'm msg 4 consumed. CrunchifyBlockingProducer: Message - i' m msg 5 produced . CrunchifyBlockingProducer : Exit Message - All done from Producer side . Produced 50 CrunchifyMessages CrunchifyBlockingConsumer : Message - i ' m msg 5 consumed . CrunchifyBlockingConsumer : Message - All done from Producer side . Produced 50 CrunchifyMessages consumed . |
Wann sollten wir java.util.concurrent.BlockingQueue verwenden?
- Wenn Sie eine eingehende Anfrage drosseln möchten, sollten Sie dasselbe verwenden
- Ein Erzeuger kann den Verbrauchern mit einer unbegrenzten Warteschlange weit voraus sein. Wenn der Consumer den Producer nicht einholt, kann dies zu einem
OutOfMemoryError
. In solchen Situationen kann es besser sein, einem potenziellen Produzenten zu signalisieren, dass die Warteschlange voll ist, und bei einem Fehler schnell aufzugeben.- Mit anderen Worten: Die Erzeuger werden natürlich gedrosselt.
- Blocking Queue wird normalerweise in der gleichzeitigen Anwendung verwendet
- Es bietet eine korrekte, Thread-sichere Implementierung
- Der Speicherverbrauch sollte ebenfalls begrenzt werden