什麼是 Java 中的線程安全阻塞隊列? 你應該什麼時候使用它? 附執行
已發表: 2021-04-30
到目前為止,我已經在 Crunchify 上寫了兩篇關於生產者消費者概念的文章。 第一個解釋 Java Semaphore 和 Mutex,第二個解釋並發讀/寫。
在本 Java 教程中,我們將通過相同的生產者/消費者概念來解釋BlockingQueue in Java
。
Java中阻塞隊列的優點是什麼?
java.util.Queue
支持在檢索元素時等待隊列變為非空,並在存儲元素時等待隊列中可用空間的操作。

我們需要創建四個 Java 類:
- CrunchifyMessage.java 放置和獲取消息
- CrunchifyBlockingProducer.java 將消息放入隊列
- CrunchifyBlockingConsumer.java 從隊列中獲取消息
- CrunchifyBlockingMain.java 開始測試
BlockingQueue 實現是thread-safe
。 所有排隊方法本質上都是原子的並且使用內部鎖。
讓我們開始在 Java 中實現線程安全的 BlockingQueue
第1步
創建類CrunchifyMessage.java
。 這是一個簡單的 Java 對象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com . crunchify . example ; /** * @author Crunchify.com * simple Message class to put and get message into queue */ public class CrunchifyMessage { private String crunchifyMsg ; public CrunchifyMessage ( String string ) { this . crunchifyMsg = string ; } public String getMsg ( ) { return crunchifyMsg ; } } |
第2步
創建生產者CrunchifyBlockingProducer.java
,它創建了簡單的 msg 並將其放入隊列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingProducer implements Runnable { private BlockingQueue <CrunchifyMessage> crunchQueue ; public CrunchifyBlockingProducer ( BlockingQueue <CrunchifyMessage> queue ) { this . crunchQueue = queue ; } @Override public void run ( ) { // producing CrunchifyMessage messages for ( int i = 1 ; i < = 5 ; i ++ ) { CrunchifyMessage msg = new CrunchifyMessage ( "i'm msg " + i ) ; try { Thread . sleep ( 10 ) ; crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Message - " + msg . getMsg ( ) + " produced." ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } // adding exit message CrunchifyMessage msg = new CrunchifyMessage ( "All done from Producer side. Produced 50 CrunchifyMessages" ) ; try { crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Exit Message - " + msg . getMsg ( ) ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } } |
第三步
創建從隊列中消費消息的類CrunchifyBlockingConsumer.java
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingConsumer implements Runnable { private BlockingQueue <CrunchifyMessage> queue ; public CrunchifyBlockingConsumer ( BlockingQueue <CrunchifyMessage> queue ) { this . queue = queue ; } @Override public void run ( ) { try { CrunchifyMessage msg ; // consuming messages until exit message is received while ( ( msg = queue . take ( ) ) . getMsg ( ) ! = "exit" ) { Thread . sleep ( 10 ) ; System . out . println ( "CrunchifyBlockingConsumer: Message - " + msg . getMsg ( ) + " consumed." ) ; } } catch ( InterruptedException e ) { e . printStackTrace ( ) ; } } } |
第四步
創建運行 BlockingQueue 測試的簡單CrunchifyBlockingMain.java
方法。 運行這個程序來檢查 BlockingQueue 的行為。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com . crunchify . example ; import java . util . concurrent . ArrayBlockingQueue ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingMain { public static void main ( String [ ] args ) { // Creating BlockingQueue of size 10 // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and // wait for space to become available in the queue when storing an element. BlockingQueue <CrunchifyMessage> crunchQueue = new ArrayBlockingQueue < > ( 10 ) ; CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer ( crunchQueue ) ; CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer ( crunchQueue ) ; // starting producer to produce messages in queue new Thread ( crunchProducer ) . start ( ) ; // starting consumer to consume messages from queue new Thread ( crunchConsumer ) . start ( ) ; System . out . println ( "Let's get started. Producer / Consumer Test Started.\n" ) ; } } |
BlockingQueue不接受空元素。 在嘗試添加、放置或提供null時,實現會拋出NullPointerException 。
null用作標記值以指示輪詢操作失敗。
結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Let 's get started. Producer / Consumer Test Started. CrunchifyBlockingProducer: Message - i' m msg 1 produced . CrunchifyBlockingProducer : Message - i 'm msg 2 produced. CrunchifyBlockingConsumer: Message - i' m msg 1 consumed . CrunchifyBlockingConsumer : Message - i 'm msg 2 consumed. CrunchifyBlockingProducer: Message - i' m msg 3 produced . CrunchifyBlockingConsumer : Message - i 'm msg 3 consumed. CrunchifyBlockingProducer: Message - i' m msg 4 produced . CrunchifyBlockingConsumer : Message - i 'm msg 4 consumed. CrunchifyBlockingProducer: Message - i' m msg 5 produced . CrunchifyBlockingProducer : Exit Message - All done from Producer side . Produced 50 CrunchifyMessages CrunchifyBlockingConsumer : Message - i ' m msg 5 consumed . CrunchifyBlockingConsumer : Message - All done from Producer side . Produced 50 CrunchifyMessages consumed . |
什麼時候應該使用 java.util.concurrent.BlockingQueue?
- 當您想限制某種傳入請求時,您應該使用相同的
- 生產者可以通過無限隊列遠遠領先於消費者。 如果消費者沒有趕上生產者,那麼它可能會導致
OutOfMemoryError
。 在這種情況下,最好向潛在的生產者發出隊列已滿的信號,並在失敗後迅速放棄。- 換句話說:生產者自然受到限制。
- 阻塞隊列通常用於並發應用程序
- 它提供了正確的、線程安全的實現
- 內存消耗也應該受到限制