Eşzamanlı Programlamaya Giriş: Başlangıç ​​Kılavuzu

Yayınlanan: 2022-03-11

Eşzamanlı programlama nedir? Basitçe tarif, aynı anda birden fazla şey yaptığınız zamandır. Paralellikle karıştırılmaması gereken eşzamanlılık, çakışan zaman dilimlerinde birden çok işlem dizisinin çalıştırılmasıdır. Programlama alanında, eşzamanlılık oldukça karmaşık bir konudur. İplikler ve kilitler gibi yapılarla uğraşmak ve yarış koşulları ve kilitlenmeler gibi sorunlardan kaçınmak oldukça zahmetli olabilir ve eşzamanlı programların yazılmasını zorlaştırabilir. Eşzamanlılık sayesinde programlar, belirli bir bileşimde birlikte çalışan bağımsız süreçler olarak tasarlanabilir. Böyle bir yapı paralel yapılabilir veya yapılmayabilir; bununla birlikte, programınızda böyle bir yapıya ulaşmak sayısız avantaj sunar.

Eşzamanlı Programlamaya Giriş

Bu yazıda, eşzamanlılık için tasarlanmış çeşitli programlama dillerinde bunların nasıl elde edileceğine dair bir dizi farklı eşzamanlılık modeline göz atacağız.

Paylaşılan Değişken Durum Modeli

Bir sayaç ve onu artıran iki iş parçacığı ile basit bir örneğe bakalım. Program çok karmaşık olmamalıdır. Metod arttırma ile artan ve onu metod get ile alan bir sayaç ve onu artıran iki thread içeren bir nesnemiz var.

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

Bu saf program, ilk bakışta göründüğü kadar saf değildir. Bu programı daha fazla çalıştırdığımda farklı sonuçlar alıyorum. Dizüstü bilgisayarımda üç infazdan sonra üç değer var.

 java Counting 553706 java Counting 547818 java Counting 613014

Bu öngörülemeyen davranışın nedeni nedir? Program, sayaç++ komutunu kullanan yöntem artırmada sayacı tek bir yerde artırır. Komut baytı koduna bakarsak, birkaç bölümden oluştuğunu görürüz:

  1. Hafızadan sayaç değerini oku
  2. Değeri yerel olarak artırın
  3. Sayaç değerini hafızaya kaydet

Şimdi bu dizide neyin yanlış gidebileceğini hayal edebiliyoruz. Sayacı bağımsız olarak artıran iki iş parçacığımız varsa, şu senaryoya sahip olabiliriz:

  1. Sayaç değeri 115
  2. İlk iş parçacığı sayaç değerini bellekten okur (115)
  3. İlk iş parçacığı yerel sayaç değerini artırır (116)
  4. İkinci iş parçacığı sayaç değerini bellekten okur (115)
  5. İkinci iş parçacığı yerel sayaç değerini artırır (116)
  6. İkinci iş parçacığı yerel sayaç değerini belleğe kaydeder (116)
  7. İlk iş parçacığı yerel sayaç değerini belleğe kaydeder (116)
  8. Sayacın değeri 116

Bu senaryoda, iki iş parçacığı iç içe geçmiştir, böylece sayaç değeri 1 artar, ancak her iş parçacığı onu 1 artırdığı için sayaç değeri 2 artırılmalıdır. Farklı iş parçacıklarının iç içe geçmesi programın sonucunu etkiler. Programın öngörülemezliğinin nedeni, programın iş parçacığı iç içe geçmesini değil, işletim sistemini kontrol etmesidir. Program her çalıştırıldığında, iplikler farklı şekilde iç içe geçebilir. Bu şekilde, programa tesadüfi öngörülemezliği (belirlenemezlik) getirdik.

Bu tesadüfi öngörülemezliği (belirleyici olmama) düzeltmek için programın iş parçacığı iç içe geçmesini kontrol etmesi gerekir. Bir iş parçacığı artırma yöntemindeyken, ilki çıkana kadar başka bir iş parçacığı aynı yöntemde olmamalıdır. Bu şekilde yöntem artışına erişimi seri hale getiriyoruz.

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Başka bir çözüm, atomik olarak artabilen bir sayaç kullanmaktır, yani işlem birden fazla işleme ayrılamaz. Bu şekilde, senkronize edilmesi gereken kod bloklarına ihtiyacımız yok. Java, java.util.concurrent.atomic ad alanında atomik veri türlerine sahiptir ve biz AtomicInteger'ı kullanacağız.

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Atomik tamsayı ihtiyacımız olan işlemlere sahiptir, bu yüzden onu Counter sınıfı yerine kullanabiliriz. Tüm atom tamsayı yöntemlerinin kilitleme kullanmadığını, bu nedenle programın tasarımını kolaylaştıran kilitlenme olasılığının bulunmadığını belirtmek ilginçtir.

Kritik yöntemleri senkronize etmek için senkronize anahtar kelimeler kullanmak tüm sorunları çözmelidir, değil mi? Diyelim ki para yatırma, çekme ve başka bir hesaba aktarma yapabilen iki hesabımız var. Aynı anda bir hesaptan diğerine para aktarmak istersek ve bunun tersini yaparsak ne olur? Bir örneğe bakalım.

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Bu programı dizüstü bilgisayarımda çalıştırdığımda genellikle takılıyor. Bu neden oluyor? Yakından bakarsak, para transfer ettiğimizde, kaynak hesapta senkronize olan ve tüm senkronize yöntemlere erişimi kilitleyen transfer yöntemine girdiğimizi ve ardından hedef hesabı kilitleyen tüm senkronize yöntemlere erişimi kilitlediğimizi görebiliriz.

Aşağıdaki senaryoyu hayal edin:

  1. Bob'un hesabındaki ilk iş parçacığı çağrıları Joe'nun hesabına aktarılıyor
  2. İkinci iş parçacığı çağrıları Joe'nun hesabındaki Bob'un hesabına transfer
  3. İkinci iş parçacığı Joe'nun hesabındaki miktarı azaltır
  4. İkinci ileti dizisi, Bob'un hesabına yatırılan tutara gidiyor ancak ilk ileti dizisinin aktarımı tamamlamasını bekliyor.
  5. İlk ileti dizisi Bob'un hesabındaki miktarı azaltıyor
  6. İlk ileti dizisi, Joe'nun hesabına para yatırmaya gidiyor, ancak ikinci ileti dizisinin aktarımı tamamlamasını bekliyor.

Bu senaryoda, bir iş parçacığı başka bir iş parçacığının aktarımını tamamlamasını bekler ve bunun tersi de geçerlidir. Birbirlerine takılıp kalırlar ve program devam edemez. Buna kilitlenme denir. Kilitlenmeyi önlemek için hesapları aynı sırayla kilitlemek gerekir. Programı düzeltmek için, parayı transfer ederken hesapları aynı sırayla kilitleyebilmemiz için her hesaba benzersiz bir numara vereceğiz.

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Bu tür hataların öngörülemezliği nedeniyle, bazen olur, ancak her zaman değil ve yeniden üretilmeleri zordur. Program öngörülemeyen bir şekilde davranıyorsa, bunun nedeni genellikle tesadüfi olmayan determinizm getiren eşzamanlılıktır. Tesadüfi olmayan determinizmden kaçınmak için, tüm iç içe geçmeleri hesaba katacak şekilde önceden tasarım programı yapmalıyız.

Tesadüfi olmayan determinizmi olan bir program örneği.

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

Bu programın içinde tesadüfi olmayan determinizm var. Kapsayıcıya girilen son değer görüntülenecektir.

 java NonDeterminism Slow

Daha yavaş iş parçacıkları değeri daha sonra girecek ve bu değer yazdırılacaktır (Yavaş). Ancak bunun böyle olması gerekmiyor. Bilgisayar aynı anda çok fazla CPU kaynağına ihtiyaç duyan başka bir programı çalıştırırsa ne olur? Program tarafından değil, işletim sistemi tarafından kontrol edildiğinden, değeri en son giren daha yavaş iş parçacığı olacağının garantisi yoktur. Programın bir bilgisayarda çalıştığı ve diğerinde farklı davrandığı durumlar olabilir. Bu tür hataların bulunması zordur ve geliştiriciler için baş ağrısına neden olurlar. Tüm bu nedenlerden dolayı bu eşzamanlılık modelini doğru yapmak çok zordur.

Fonksiyonel Yol

paralellik

İşlevsel dillerin kullandığı başka bir modele bakalım. Örneğin, Leiningen aracı kullanılarak yorumlanabilen Clojure kullanacağız. Clojure, eşzamanlılık için iyi bir desteğe sahip çok ilginç bir dildir. Önceki eşzamanlılık modeli, paylaşılan değişebilir durumdaydı. Kullandığımız sınıflar, API'lerinden belli olmadığı için bilmediğimiz bir mutasyona uğrayan gizli bir duruma da sahip olabilir. Gördüğümüz gibi, dikkatli olmazsak bu model kazara determinizm ve kilitlenmelere neden olabilir. İşlevsel diller, değişmeyen veri türlerine sahiptir, böylece değişme riski olmadan güvenli bir şekilde paylaşılabilirler. Fonksiyonlar, diğer veri tiplerinin yanı sıra özelliklere de sahiptir. Fonksiyonlar, program yürütme sırasında oluşturulabilir ve başka bir fonksiyona parametre olarak geçirilebilir veya fonksiyon çağrısı sonucunda geri dönebilir.

Eşzamanlı programlamanın temel ilkeleri gelecek ve vaattir. Future, başka bir iş parçacığında bir kod bloğu yürütür ve blok yürütüldüğünde girilecek olan gelecekteki değer için bir nesne döndürür.

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

Bu betiği çalıştırdığımda çıktı:

 Started A Started B Waiting for futures Finished A Finished B 10

Bu örnekte, bağımsız olarak yürütülen iki gelecek bloğumuz var. Program yalnızca, henüz mevcut olmayan gelecekteki nesneden değeri okurken bloklar. Bizim durumumuzda, gelecek blokların her iki sonucunun da toplanması bekleniyor. Davranış tahmin edilebilirdir (belirleyicidir) ve paylaşılan değişken bir durum olmadığı için her zaman aynı sonucu verecektir.

Eşzamanlılık için kullanılan başka bir ilkel bir sözdür. Promise, kişinin bir kez değer koyabileceği bir kapsayıcıdır. Sözleri okurken, iş parçacığı sözün değeri dolana kadar bekleyecektir.

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

Bu örnekte, gelecek vaat edilen değer kaydedilmediği sürece sonucu yazdırmak için bekleyecektir. İki saniye sonra, gelecek iş parçacığında yazdırılmak üzere sözde 42 değeri saklanacaktır. Sözleri kullanmak, geleceğin aksine kilitlenmeye yol açabilir, bu nedenle söz kullanırken dikkatli olun.

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

Bu örnekte, geleceğin sonucunu ve sözün sonucunu kullanıyoruz. Değerleri ayarlama ve okuma sırası, ana iş parçacığının gelecekteki iş parçacığından bir değer beklemesi ve gelecekteki iş parçacığının ana iş parçacığından bir değer beklemesidir. Bu davranış tahmin edilebilir (belirleyici) olacak ve program her çalıştığında oynatılacak, bu da hatayı bulmayı ve kaldırmayı kolaylaştırıyor.

Geleceği kullanmak, programın, geleceğin yürütülmesinin sonucuna ihtiyaç duyana kadar alıştırmaya devam etmesine izin verir. Bu, daha hızlı program yürütme ile sonuçlanır. Geleceği olan birden fazla işlemciniz varsa, öngörülebilir (deterministik) davranışa sahip (her seferinde aynı sonucu veren) programın paralel yürütülmesini sağlayabilirsiniz. Bu şekilde bilgisayarın gücünü daha iyi kullanırız.

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

Bu örnekte, geleceğin kullanımının bir bilgisayarın hızını nasıl daha iyi kullanabileceğini görebilirsiniz. Toplayan iki Fibonacci sayımız var. Programın sonucu iki kez hesapladığını görebiliriz, birincisi tek bir iş parçacığında sıralı olarak, ikincisi iki iş parçacığında paralel olarak. Dizüstü bilgisayarımın çok çekirdekli bir işlemcisi olduğundan paralel yürütme, sıralı hesaplamadan iki kat daha hızlı çalışır.

Bu betiği dizüstü bilgisayarımda yürütmenin sonucu:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

eşzamanlılık

Clojure programlama dilinde eşzamanlılığı ve öngörülemezliği desteklemek için, diğer iş parçacıklarının değişiklikleri görebilmesi için değişken bir veri türü kullanmalıyız. En basit değişken veri türü atomdur. Atom, her zaman başka bir değerle değiştirilebilecek değere sahip bir kaptır. Değer, yeni bir değer girilerek veya eski değeri alan ve daha sık kullanılan yeni değeri döndüren bir işlev çağırılarak değiştirilebilir. Atomun kilitlenmeden uygulanması ilginçtir ve threadlerde güvenle kullanılabilir, yani kilitlenmeye ulaşmak imkansızdır. Dahili olarak atom, java.util.concurrent.AtomicReference kitaplığını kullanır. Atom ile gerçekleştirilmiş bir karşı örneğe bakalım.

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Dizüstü bilgisayarımda komut dosyası yürütmenin sonucu:

 The counter is: 1000000 Number of attempts: 1680212

Bu örnekte, sayacın değerini içeren bir atom kullanıyoruz. Sayaç (takas! sayaç inc) ile artar. Swap fonksiyonu şu şekilde çalışır: 1. Sayaç değerini al ve koru 2. Bu değer için yeni değeri hesaplayan verilen fonksiyonu çağırır 3. Yeni değeri kaydetmek için eski değerin değişip değişmediğini kontrol eden atomik işlemi kullanır 3a. değer değişmediyse yeni bir 3b değeri girer. Bu arada değer değiştirilirse adım 1'e gidin. Bu arada değer değiştirilirse fonksiyonun tekrar çağrılabileceğini görüyoruz. Değer yalnızca başka bir iş parçacığından değiştirilebilir. Bu nedenle, yeni bir değer hesaplayan fonksiyonun herhangi bir yan etkisinin olmaması önemlidir, böylece daha fazla çağrılmasının bir önemi yoktur. Atomun bir sınırlaması, değişiklikleri tek bir değere senkronize etmesidir.

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

Bu betiği çalıştırdığımda şunu alıyorum:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

Bu örnekte daha fazla atomu nasıl değiştirdiğimizi görebiliriz. Bir noktada, tutarsızlık olabilir. Aynı anda iki hesabın toplamı aynı değildir. Birden çok değerdeki değişiklikleri koordine etmemiz gerekirse, iki çözüm vardır:

  1. Bir atoma daha fazla değer yerleştirin
  2. Daha sonra göreceğimiz gibi, referansları ve yazılım işlem belleğini kullanın
 ; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

Bu betiği bilgisayarımda çalıştırdığımda şunu alıyorum:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

Örnekte, bir harita kullanarak daha fazla değer vermemiz için koordinasyon çözüldü. Hesaptan para transfer ettiğimizde, tüm hesapları o anda değiştiriyoruz, böylece para toplamının aynı olmaması asla olmayacak.

Sonraki değişken veri türü aracıdır. Aracı, yalnızca değeri değiştiren işlevin farklı bir iş parçacığında yürütüldüğü için bir atom gibi davranır, bu nedenle değişikliğin görünür hale gelmesi biraz zaman alır. Bu nedenle, etmen değerini okurken, etmen değerini değiştiren tüm işlevlerin yürütülmesini bekleyen bir işlevi çağırmak gerekir. Atomlardan farklı olarak değeri değiştiren fonksiyon sadece bir kez çağrılır ve bu nedenle yan etkileri olabilir. Bu tip ayrıca bir değeri senkronize edebilir ve kilitlenemez.

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Bu betiği dizüstü bilgisayarımda çalıştırdığımda şunu alıyorum:

 The counter is: 1000000 Number of attempts: 1000000

Bu örnek, sayacın atom ile uygulanması ile aynıdır. Tek fark, burada beklemeyi kullanarak son değeri okumadan önce tüm aracı değişikliklerinin tamamlanmasını beklememizdir.

Son değişken veri türü referanslardır. Atomlardan farklı olarak referanslar, değişiklikleri birden çok değere senkronize edebilir. Başvurudaki her işlem, dosync kullanan bir işlemde olmalıdır. Bu veri değiştirme yöntemine yazılım işlem belleği veya kısaltılmış STM denir. Hesaplardaki para transferi ile bir örneğe bakalım.

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

Bu betiği çalıştırdığımda şunu alıyorum:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

İlginç bir şekilde, yapılan işlem sayısından daha fazla girişim oldu. Bunun nedeni, STM'nin kilit kullanmamasıdır, bu nedenle bir çakışma varsa (aynı değeri değiştirmeye çalışan iki iş parçacığı gibi) işlem yeniden yürütülür. Bu nedenle işlemin yan etkileri olmamalıdır. İşlem içerisinde değeri değişen aracının öngörülebilir davrandığını görebiliriz. Aracının değerini değiştiren bir fonksiyon, işlem sayısı kadar değerlendirilecektir. Bunun nedeni, acentenin işlemden haberdar olmasıdır. İşlemin yan etkileri olması gerekiyorsa, bunlar acente bünyesinde faaliyete geçirilmelidir. Bu şekilde, program öngörülebilir davranışa sahip olacaktır. Muhtemelen her zaman STM kullanmanız gerektiğini düşünüyorsunuz, ancak deneyimli programcılar genellikle atomları kullanır çünkü atomlar STM'den daha basit ve daha hızlıdır. Tabii bu şekilde bir program yapmak mümkünse. Yan etkileriniz varsa, STM ve ajanları kullanmaktan başka seçeneğiniz yoktur.

Aktör Modeli

Aşağıdaki eşzamanlılık modeli bir aktör modelidir. Bu modelin prensibi gerçek dünyaya benzer. Birçok insanla bir şey yaratmak için bir anlaşma yaparsak, örneğin bir bina, o zaman şantiyedeki her adamın kendi rolü vardır. Bir insan kalabalığı denetçi tarafından denetlenir. Bir işçi işyerinde yaralanırsa, denetçi yaralı adamın işini mevcut diğerlerine devreder. Gerekirse siteye yeni bir adam yönlendirebilir. Sitede işi aynı anda (eşzamanlı olarak) yapan ve aynı zamanda senkronize etmek için birbirleriyle konuşan daha fazla insan var. Şantiyede çalışmayı programa koyarsak, o zaman her insan devleti olan ve kendi işleyişini yürüten bir aktör olur ve konuşmanın yerini mesajlar alırdı. Bu modele dayalı popüler programlama dili Erlang'dır. Bu ilginç dil, değişmez veri tiplerine ve diğer veri tipleriyle aynı özelliklere sahip fonksiyonlara sahiptir. Fonksiyonlar, programın yürütülmesi sırasında oluşturulabilir ve başka bir fonksiyona argüman olarak iletilebilir veya fonksiyon çağrısının sonucu olarak döndürülebilir. Erlang sanal makinesini kullanan Elixir dilinde örnekler vereceğim, bu yüzden Erlang ile aynı programlama modeline sahip olacağım sadece sözdizimi farklı. Elixir'deki en önemli üç ilkel yumurtlama, gönderme ve almadır. spawn, yeni işlemde işlevi yürütür, gönder işlemi işleme gönderir ve mevcut işleme gönderilen mesajları alır.

Aktör modeli ile ilk örnek, eşzamanlı olarak karşı artırılacaktır. Bu model ile program yapabilmek için, bir aktörün sayacın değerine sahip olması ve sayacın değerini ayarlayıp alması için mesaj alması ve aynı anda sayacın değerini artıracak iki aktöre sahip olması gerekir.

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Bu örneği yürüttüğümde şunu alıyorum:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

Sonunda sayacın beklediğimiz gibi 1000000 değil 516827 olduğunu görebiliriz. Komut dosyasını bir sonraki çalıştırdığımda 511010 aldım. Bu davranışın nedeni sayacın iki mesaj almasıdır: mevcut değeri al ve yeni değeri ayarla. Sayacı artırmak için programın mevcut değeri alması, 1 artırması ve artan değeri ayarlaması gerekir. Sayaç prosesine gönderilen mesajı kullanarak iki proses aynı anda sayacın değerini okur ve yazar. Sayacın alacağı mesajların sırası tahmin edilemez ve program bunu kontrol edemez. Bu senaryoyu hayal edebiliriz:

  1. Sayaç değeri 115
  2. İşlem A, sayacın değerini okur (115)
  3. Proses B sayacın değerini okur (115)
  4. Proses B değeri lokal olarak arttırır (116)
  5. Proses B sayaca artan değeri ayarlar (116)
  6. İşlem A, sayacın değerini arttırır (116)
  7. İşlem A, sayaca artan değeri ayarlar (116)
  8. Sayaç değeri 116

Senaryoya bakarsak, iki işlem sayacı 1 artırır ve sayaç sonunda 2 değil 1 artar. Bu tür iç içe geçmeler önceden tahmin edilemeyen sayıda olabilir ve bu nedenle sayacın değeri tahmin edilemez. Bu davranışı engellemek için tek mesaj ile artırma işlemi yapılmalıdır.

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Bu betiği çalıştırarak şunu elde ederim:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

Sayacın doğru değere sahip olduğunu görebiliriz. Öngörülebilir (deterministik) davranışın nedeni, sayacın değerinin bir mesaj artmasıdır, böylece sayacı artırmak için mesaj dizisinin nihai değerini etkilemeyecektir. Aktör modeliyle çalışırken, mesajların nasıl iç içe geçebileceğine dikkat etmeli ve yanlışlıkla öngörülemezlikten (belirleyici olmama) kaçınmak için mesajların ve mesajlar üzerindeki eylemlerin dikkatli bir şekilde tasarlanmasına dikkat etmeliyiz.

Bu model ile iki hesap arasında nasıl para transferi yapabiliriz?

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

Bu betiği çalıştırdığımda şunu alıyorum:

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

Para transferinin tutarsızlıklar olmadan çalıştığını görebiliriz, çünkü para transferi için mesaj transferini ve hesapların değerini almak için mesaj miktarlarını seçtik, bu da bize programın öngörülebilir davranışını verir. Ne zaman para transferi yapsak, herhangi bir zamanda toplam para miktarı aynı olmalıdır.

Aktör modeli kilitlenmeye ve dolayısıyla kilitlenmeye neden olabilir, bu nedenle programı tasarlarken dikkatli olun. Aşağıdaki komut dosyası, kilitleme ve kilitlenme senaryosunu nasıl simüle edebileceğinizi gösterir.

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

Sarmak

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial