Java'da Thread-Safe BlockingQueue nedir? Ne zaman kullanmalısın? Uygulama Eklendi
Yayınlanan: 2021-04-30
Şimdiye kadar Crunchify'da Üretici Tüketici konsepti üzerine iki makale yazdım. Birincisi Java Semaphore ve Mutex'i açıklar ve ikincisi Eşzamanlı Okuma/Yazmayı açıklar.
Bu Java BlockingQueue in Java
açıklamak için aynı Üretici/Tüketici konseptini inceleyeceğiz.
Java'da Engelleme Sırasının avantajları nelerdir?
Bir java.util.Queue
, bir öğe alınırken sıranın boş olmamasını ve bir öğe depolanırken kuyrukta boş alanın olmasını bekleyen işlemleri destekler.

Dört Java Sınıfı oluşturmamız gerekiyor:
- Mesaj koymak ve almak için CrunchifyMessage.java
- Mesajı kuyruğa almak için CrunchifyBlockingProducer.java
- Sıradan mesaj almak için CrunchifyBlockingConsumer.java
- Testi başlatmak için CrunchifyBlockingMain.java
BlockingQueue uygulamaları thread-safe
. Tüm kuyruğa alma yöntemleri, doğası gereği atomiktir ve dahili kilitler kullanır.
Java'da Thread-Safe BlockingQueue uygulamasına başlayalım
Aşama 1
CrunchifyMessage.java
sınıfını oluşturun. Bu basit Java Nesnesidir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package com . crunchify . example ; /** * @author Crunchify.com * simple Message class to put and get message into queue */ public class CrunchifyMessage { private String crunchifyMsg ; public CrunchifyMessage ( String string ) { this . crunchifyMsg = string ; } public String getMsg ( ) { return crunchifyMsg ; } } |
Adım 2
Basit msg oluşturan ve kuyruğa koyan yapımcı CrunchifyBlockingProducer.java
oluşturun.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingProducer implements Runnable { private BlockingQueue <CrunchifyMessage> crunchQueue ; public CrunchifyBlockingProducer ( BlockingQueue <CrunchifyMessage> queue ) { this . crunchQueue = queue ; } @Override public void run ( ) { // producing CrunchifyMessage messages for ( int i = 1 ; i < = 5 ; i ++ ) { CrunchifyMessage msg = new CrunchifyMessage ( "i'm msg " + i ) ; try { Thread . sleep ( 10 ) ; crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Message - " + msg . getMsg ( ) + " produced." ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } // adding exit message CrunchifyMessage msg = new CrunchifyMessage ( "All done from Producer side. Produced 50 CrunchifyMessages" ) ; try { crunchQueue . put ( msg ) ; System . out . println ( "CrunchifyBlockingProducer: Exit Message - " + msg . getMsg ( ) ) ; } catch ( Exception e ) { System . out . println ( "Exception:" + e ) ; } } } |
Aşama 3
Kuyruktan gelen mesajı tüketen CrunchifyBlockingConsumer.java
sınıfını oluşturun.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package com . crunchify . example ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingConsumer implements Runnable { private BlockingQueue <CrunchifyMessage> queue ; public CrunchifyBlockingConsumer ( BlockingQueue <CrunchifyMessage> queue ) { this . queue = queue ; } @Override public void run ( ) { try { CrunchifyMessage msg ; // consuming messages until exit message is received while ( ( msg = queue . take ( ) ) . getMsg ( ) ! = "exit" ) { Thread . sleep ( 10 ) ; System . out . println ( "CrunchifyBlockingConsumer: Message - " + msg . getMsg ( ) + " consumed." ) ; } } catch ( InterruptedException e ) { e . printStackTrace ( ) ; } } } |
4. Adım
BlockingQueue testini çalıştıran basit CrunchifyBlockingMain.java
yöntemi oluşturun. BlockingQueue davranışını kontrol etmek için bu programı çalıştırın.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package com . crunchify . example ; import java . util . concurrent . ArrayBlockingQueue ; import java . util . concurrent . BlockingQueue ; /** * @author Crunchify.com * */ public class CrunchifyBlockingMain { public static void main ( String [ ] args ) { // Creating BlockingQueue of size 10 // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and // wait for space to become available in the queue when storing an element. BlockingQueue <CrunchifyMessage> crunchQueue = new ArrayBlockingQueue < > ( 10 ) ; CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer ( crunchQueue ) ; CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer ( crunchQueue ) ; // starting producer to produce messages in queue new Thread ( crunchProducer ) . start ( ) ; // starting consumer to consume messages from queue new Thread ( crunchConsumer ) . start ( ) ; System . out . println ( "Let's get started. Producer / Consumer Test Started.\n" ) ; } } |
BlockingQueue boş öğeleri kabul etmez. Uygulamalar, bir null ekleme , koyma veya sunma girişimlerinde NullPointerException oluşturur.
Yoklama işlemlerinin başarısızlığını belirtmek için bir sentinel değeri olarak bir boş değer kullanılır.
Sonuç:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Let 's get started. Producer / Consumer Test Started. CrunchifyBlockingProducer: Message - i' m msg 1 produced . CrunchifyBlockingProducer : Message - i 'm msg 2 produced. CrunchifyBlockingConsumer: Message - i' m msg 1 consumed . CrunchifyBlockingConsumer : Message - i 'm msg 2 consumed. CrunchifyBlockingProducer: Message - i' m msg 3 produced . CrunchifyBlockingConsumer : Message - i 'm msg 3 consumed. CrunchifyBlockingProducer: Message - i' m msg 4 produced . CrunchifyBlockingConsumer : Message - i 'm msg 4 consumed. CrunchifyBlockingProducer: Message - i' m msg 5 produced . CrunchifyBlockingProducer : Exit Message - All done from Producer side . Produced 50 CrunchifyMessages CrunchifyBlockingConsumer : Message - i ' m msg 5 consumed . CrunchifyBlockingConsumer : Message - All done from Producer side . Produced 50 CrunchifyMessages consumed . |
Java.util.concurrent.BlockingQueue'yu ne zaman kullanmalıyız?
- Bir tür gelen isteği kısmak istediğinizde, aynısını kullanmalısınız.
- Bir üretici sınırsız bir kuyrukla tüketicilerin çok önüne geçebilir. Tüketici üreticiye yetişemezse, bu bir
OutOfMemoryError
neden olabilir. Bu gibi durumlarda, olası bir üreticiye kuyruğun dolduğunu bildirmek ve bir arıza ile çabucak pes etmek daha iyi olabilir.- Başka bir deyişle: üreticiler doğal olarak kısılır.
- Engelleme Sırası normalde eşzamanlı uygulamada kullanılır
- Doğru, iş parçacığı için güvenli bir uygulama sağlar
- Bellek tüketimi de sınırlı olmalıdır