Apa itu Thread-Safe BlockingQueue di Java? Kapan Anda harus menggunakannya? Implementasi Terlampir
Diterbitkan: 2021-04-30
Sejauh ini saya sudah menulis dua artikel tentang konsep Producer Consumer di Crunchify. Yang pertama untuk menjelaskan Java Semaphore dan Mutex dan yang kedua untuk menjelaskan Baca/Tulis Bersamaan.
Dalam Tutorial Java ini kita akan membahas konsep Producer/Consumer yang sama untuk menjelaskan BlockingQueue in Java
.
Apa keuntungan dari Memblokir Antrian di Jawa?
java.util.Queue
mendukung operasi yang menunggu antrian menjadi tidak kosong saat mengambil elemen, dan menunggu ruang tersedia dalam antrian saat menyimpan elemen.

Kita perlu membuat empat Kelas Java:
- CrunchifyMessage.java untuk menempatkan dan menerima pesan
- CrunchifyBlockingProducer.java untuk memasukkan pesan ke dalam antrian
- CrunchifyBlockingConsumer.java untuk mendapatkan pesan dari antrian
- CrunchifyBlockingMain.java untuk memulai pengujian
Implementasi BlockingQueue adalah thread-safe
. Semua metode antrian bersifat atomik dan menggunakan kunci internal.
Mari kita mulai implementasi Thread-Safe BlockingQueue di Java
Langkah 1
Buat kelas CrunchifyMessage.java
. Ini adalah Objek Java sederhana.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com . crunchify . example ; /** * @author Crunchify.com * simple Message class to put and get message into queue */ public class CrunchifyMessage { private String crunchifyMsg ; public CrunchifyMessage ( String string ) { this . crunchifyMsg = string ; } public String getMsg ( ) { return crunchifyMsg ; } } |
Langkah 2
Buat produser CrunchifyBlockingProducer.java
yang membuat pesan sederhana dan memasukkannya ke dalam antrian.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingProducer implements Runnable { private BlockingQueue <CrunchifyMessage> crunchQueue ; public CrunchifyBlockingProducer ( BlockingQueue <CrunchifyMessage> queue ) { this . crunchQueue = queue ; } @Override public void run ( ) { // producing CrunchifyMessage messages for ( int i = 1 ; i < = 5 ; i ++ ) { CrunchifyMessage msg = new CrunchifyMessage ( "i'm msg " + i ) ; try { Thread . sleep ( 10 ) ; crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Message - " + msg . getMsg ( ) + " produced." ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } // adding exit message CrunchifyMessage msg = new CrunchifyMessage ( "All done from Producer side. Produced 50 CrunchifyMessages" ) ; try { crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Exit Message - " + msg . getMsg ( ) ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } } |
Langkah-3
Buat kelas CrunchifyBlockingConsumer.java
yang menggunakan pesan dari antrian.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingConsumer implements Runnable { private BlockingQueue <CrunchifyMessage> queue ; public CrunchifyBlockingConsumer ( BlockingQueue <CrunchifyMessage> queue ) { this . queue = queue ; } @Override public void run ( ) { try { CrunchifyMessage msg ; // consuming messages until exit message is received while ( ( msg = queue . take ( ) ) . getMsg ( ) ! = "exit" ) { Thread . sleep ( 10 ) ; System . out . println ( "CrunchifyBlockingConsumer: Message - " + msg . getMsg ( ) + " consumed." ) ; } } catch ( InterruptedException e ) { e . printStackTrace ( ) ; } } } |
Langkah-4
Buat metode CrunchifyBlockingMain.java
sederhana yang menjalankan tes BlockingQueue. Jalankan program ini untuk memeriksa perilaku BlockingQueue.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com . crunchify . example ; import java . util . concurrent . ArrayBlockingQueue ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingMain { public static void main ( String [ ] args ) { // Creating BlockingQueue of size 10 // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and // wait for space to become available in the queue when storing an element. BlockingQueue <CrunchifyMessage> crunchQueue = new ArrayBlockingQueue < > ( 10 ) ; CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer ( crunchQueue ) ; CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer ( crunchQueue ) ; // starting producer to produce messages in queue new Thread ( crunchProducer ) . start ( ) ; // starting consumer to consume messages from queue new Thread ( crunchConsumer ) . start ( ) ; System . out . println ( "Let's get started. Producer / Consumer Test Started.\n" ) ; } } |
BlockingQueue tidak menerima elemen null . Implementasi melempar NullPointerException pada upaya untuk menambahkan , menempatkan atau menawarkan null .
Sebuah null digunakan sebagai nilai sentinel untuk menunjukkan kegagalan operasi polling .
Hasil:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Let 's get started. Producer / Consumer Test Started. CrunchifyBlockingProducer: Message - i' m msg 1 produced . CrunchifyBlockingProducer : Message - i 'm msg 2 produced. CrunchifyBlockingConsumer: Message - i' m msg 1 consumed . CrunchifyBlockingConsumer : Message - i 'm msg 2 consumed. CrunchifyBlockingProducer: Message - i' m msg 3 produced . CrunchifyBlockingConsumer : Message - i 'm msg 3 consumed. CrunchifyBlockingProducer: Message - i' m msg 4 produced . CrunchifyBlockingConsumer : Message - i 'm msg 4 consumed. CrunchifyBlockingProducer: Message - i' m msg 5 produced . CrunchifyBlockingProducer : Exit Message - All done from Producer side . Produced 50 CrunchifyMessages CrunchifyBlockingConsumer : Message - i ' m msg 5 consumed . CrunchifyBlockingConsumer : Message - All done from Producer side . Produced 50 CrunchifyMessages consumed . |
Kapan kita harus menggunakan java.util.concurrent.BlockingQueue?
- Ketika Anda ingin mencekik semacam permintaan yang masuk maka Anda harus menggunakan yang sama
- Seorang produsen dapat berada jauh di depan konsumen dengan antrian yang tidak terbatas. Jika konsumen tidak mengejar produsen maka dapat menyebabkan
OutOfMemoryError
. Dalam situasi seperti ini, mungkin lebih baik untuk memberi sinyal kepada calon produsen bahwa antrian sudah penuh, dan cepat menyerah jika gagal.- Dengan kata lain: para produsen secara alami tercekik.
- Antrian Pemblokiran biasanya digunakan dalam aplikasi bersamaan
- Ini memberikan implementasi thread-safe yang benar
- Konsumsi memori juga harus dibatasi