Thread-Safe BlockingQueue ใน Java คืออะไร คุณควรใช้เมื่อใด แนบการดำเนินการ
เผยแพร่แล้ว: 2021-04-30
จนถึงตอนนี้ ฉันได้เขียนบทความสองบทความเกี่ยวกับแนวคิดของ Producer Consumer เกี่ยวกับ Crunchify อันแรกเพื่ออธิบาย Java Semaphore และ Mutex และอันที่ 2 เพื่ออธิบายการอ่าน/เขียนพร้อมกัน
ในบทช่วยสอน Java นี้ เราจะพูดถึงแนวคิด Producer/Consumer เดียวกันเพื่ออธิบาย BlockingQueue in Java
ข้อดีของ Blocking Queue ใน Java คืออะไร?
java.util.Queue
รองรับการดำเนินการที่รอให้คิวว่างเมื่อดึงข้อมูลองค์ประกอบ และรอให้พื้นที่ว่างในคิวเมื่อจัดเก็บองค์ประกอบ

เราจำเป็นต้องสร้างคลาส Java สี่คลาส:
- CrunchifyMessage.java เพื่อใส่และรับข้อความ
- CrunchifyBlockingProducer.java เพื่อใส่ข้อความลงในคิว
- CrunchifyBlockingConsumer.java เพื่อรับข้อความจากคิว
- CrunchifyBlockingMain.java เพื่อเริ่มการทดสอบ
การใช้งาน BlockingQueue นั้น thread-safe
วิธีการเข้าคิวทั้งหมดมีลักษณะเป็นอะตอมและใช้การล็อกภายใน
มาเริ่มกันเลยกับการใช้งาน Thread-Safe BlockingQueue ใน Java
ขั้นตอนที่ 1
สร้างคลาส CrunchifyMessage.java
นี่คือ Java Object อย่างง่าย
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com . crunchify . example ; /** * @author Crunchify.com * simple Message class to put and get message into queue */ public class CrunchifyMessage { private String crunchifyMsg ; public CrunchifyMessage ( String string ) { this . crunchifyMsg = string ; } public String getMsg ( ) { return crunchifyMsg ; } } |
ขั้นตอนที่ 2
สร้างผู้ผลิต CrunchifyBlockingProducer.java
ซึ่งสร้าง msg อย่างง่ายและใส่ลงในคิว
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingProducer implements Runnable { private BlockingQueue <CrunchifyMessage> crunchQueue ; public CrunchifyBlockingProducer ( BlockingQueue <CrunchifyMessage> queue ) { this . crunchQueue = queue ; } @Override public void run ( ) { // producing CrunchifyMessage messages for ( int i = 1 ; i < = 5 ; i ++ ) { CrunchifyMessage msg = new CrunchifyMessage ( "i'm msg " + i ) ; try { Thread . sleep ( 10 ) ; crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Message - " + msg . getMsg ( ) + " produced." ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } // adding exit message CrunchifyMessage msg = new CrunchifyMessage ( "All done from Producer side. Produced 50 CrunchifyMessages" ) ; try { crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Exit Message - " + msg . getMsg ( ) ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } } |
ขั้นตอนที่ 3
สร้างคลาส CrunchifyBlockingConsumer.java
ซึ่งใช้ข้อความจากคิว
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingConsumer implements Runnable { private BlockingQueue <CrunchifyMessage> queue ; public CrunchifyBlockingConsumer ( BlockingQueue <CrunchifyMessage> queue ) { this . queue = queue ; } @Override public void run ( ) { try { CrunchifyMessage msg ; // consuming messages until exit message is received while ( ( msg = queue . take ( ) ) . getMsg ( ) ! = "exit" ) { Thread . sleep ( 10 ) ; System . out . println ( "CrunchifyBlockingConsumer: Message - " + msg . getMsg ( ) + " consumed." ) ; } } catch ( InterruptedException e ) { e . printStackTrace ( ) ; } } } |
ขั้นตอนที่ 4
สร้างเมธอด CrunchifyBlockingMain.java
อย่างง่าย ซึ่งรันการทดสอบ BlockingQueue เรียกใช้โปรแกรมนี้เพื่อตรวจสอบพฤติกรรมของ BlockingQueue

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com . crunchify . example ; import java . util . concurrent . ArrayBlockingQueue ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingMain { public static void main ( String [ ] args ) { // Creating BlockingQueue of size 10 // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and // wait for space to become available in the queue when storing an element. BlockingQueue <CrunchifyMessage> crunchQueue = new ArrayBlockingQueue < > ( 10 ) ; CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer ( crunchQueue ) ; CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer ( crunchQueue ) ; // starting producer to produce messages in queue new Thread ( crunchProducer ) . start ( ) ; // starting consumer to consume messages from queue new Thread ( crunchConsumer ) . start ( ) ; System . out . println ( "Let's get started. Producer / Consumer Test Started.\n" ) ; } } |
BlockingQueue ไม่ยอมรับองค์ประกอบที่เป็น โมฆะ การใช้งานจะส่ง NullPointerException เมื่อพยายาม เพิ่ม วาง หรือ เสนอ ค่า null
ค่า Null ถูกใช้เป็นค่า Sentinel เพื่อบ่งชี้ความล้มเหลวของการดำเนินการ สำรวจความคิดเห็น
ผลลัพธ์:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Let 's get started. Producer / Consumer Test Started. CrunchifyBlockingProducer: Message - i' m msg 1 produced . CrunchifyBlockingProducer : Message - i 'm msg 2 produced. CrunchifyBlockingConsumer: Message - i' m msg 1 consumed . CrunchifyBlockingConsumer : Message - i 'm msg 2 consumed. CrunchifyBlockingProducer: Message - i' m msg 3 produced . CrunchifyBlockingConsumer : Message - i 'm msg 3 consumed. CrunchifyBlockingProducer: Message - i' m msg 4 produced . CrunchifyBlockingConsumer : Message - i 'm msg 4 consumed. CrunchifyBlockingProducer: Message - i' m msg 5 produced . CrunchifyBlockingProducer : Exit Message - All done from Producer side . Produced 50 CrunchifyMessages CrunchifyBlockingConsumer : Message - i ' m msg 5 consumed . CrunchifyBlockingConsumer : Message - All done from Producer side . Produced 50 CrunchifyMessages consumed . |
เมื่อใดที่เราควรใช้ java.util.concurrent.BlockingQueue
- เมื่อคุณต้องการเค้นคำขอที่เข้ามาคุณควรใช้เหมือนกัน
- ผู้ผลิตสามารถล้ำหน้าผู้บริโภคได้ไกลด้วยคิวที่ไม่จำกัด หากผู้บริโภคไม่ทันกับผู้ผลิต อาจทำให้เกิด
OutOfMemoryError
ในสถานการณ์เช่นนี้ อาจเป็นการดีกว่าที่จะส่งสัญญาณให้ผู้ผลิตทราบว่าคิวเต็ม และเลิกล้มอย่างรวดเร็วหากเกิดความล้มเหลว- กล่าวอีกนัยหนึ่ง: ผู้ผลิตถูกควบคุมอย่างเป็นธรรมชาติ
- ปกติใช้ Blocking Queue ในแอพพลิเคชั่นพร้อมกัน
- มีการใช้งานที่ถูกต้องและปลอดภัยสำหรับเธรด
- การใช้หน่วยความจำก็ควรถูกจำกัดเช่นกัน