並行プログラミング入門:初心者向けガイド
公開: 2022-03-11並行プログラミングとは何ですか? 簡単に言えば、それはあなたが同時に複数のことをしているときです。 並列処理と混同しないように、同時実行とは、複数の操作シーケンスが重複する期間に実行される場合です。 プログラミングの分野では、並行性はかなり複雑な問題です。 スレッドやロックなどの構造を処理し、競合状態やデッドロックなどの問題を回避することは非常に面倒であり、並行プログラムの作成が困難になる可能性があります。 並行性により、プログラムは、特定の構成で連携して動作する独立したプロセスとして設計できます。 このような構造は、並列化される場合とされない場合があります。 ただし、プログラムでこのような構造を実現すると、多くの利点があります。
この記事では、並行性のさまざまなモデルと、並行性のために設計されたさまざまなプログラミング言語でそれらを実現する方法について説明します。
共有可変状態モデル
カウンターとそれを増やす2つのスレッドを使用した簡単な例を見てみましょう。 プログラムはそれほど複雑であってはなりません。 メソッドの増加に伴って増加するカウンターを含むオブジェクトがあり、メソッドgetとそれを増加させる2つのスレッドを使用してそれを取得します。
// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }
この素朴なプログラムは、一見しただけでは素朴ではありません。 このプログラムを何度も実行すると、異なる結果が得られます。 私のラップトップで3回実行した後、3つの値があります。
java Counting 553706 java Counting 547818 java Counting 613014
この予測できない動作の理由は何ですか? プログラムは、コマンドcounter++を使用するメソッドincreaseで、カウンターを1か所で増やします。 コマンドバイトコードを見ると、いくつかの部分で構成されていることがわかります。
- メモリからカウンター値を読み取る
- ローカルで価値を高める
- カウンター値をメモリに保存する
これで、このシーケンスで何がうまくいかないかを想像できます。 独立してカウンターを増やす2つのスレッドがある場合、次のシナリオが考えられます。
- カウンターバリューは115です
- 最初のスレッドは、メモリからカウンタの値を読み取ります(115)
- 最初のスレッドはローカルカウンター値を増やします(116)
- 2番目のスレッドは、メモリからカウンタの値を読み取ります(115)
- 2番目のスレッドはローカルカウンター値を増やします(116)
- 2番目のスレッドはローカルカウンター値をメモリに保存します(116)
- 最初のスレッドはローカルカウンター値をメモリに保存します(116)
- カウンターの値は116です
このシナリオでは、2つのスレッドが絡み合ってカウンター値が1増加しますが、各スレッドが1ずつ増加するため、カウンター値を2増やす必要があります。異なるスレッドが絡み合っていると、プログラムの結果に影響します。 プログラムが予測できない理由は、プログラムがスレッドの絡み合いを制御できないが、オペレーティングシステムを制御できないためです。 プログラムが実行されるたびに、スレッドは異なる方法で絡み合う可能性があります。 このようにして、プログラムに偶発的な予測不可能性(非決定論)を導入しました。
この偶発的な予測不可能性(非決定論)を修正するには、プログラムがスレッドの絡み合いを制御する必要があります。 1つのスレッドがメソッドに含まれている場合、最初のスレッドが出るまで、別のスレッドが同じメソッドに含まれていてはなりません。 このようにして、メソッド増加へのアクセスをシリアル化します。
// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
もう1つの解決策は、アトミックに増加できるカウンターを使用することです。つまり、操作を複数の操作に分割することはできません。 このように、同期する必要のあるコードのブロックを用意する必要はありません。 Javaのjava.util.concurrent.atomic名前空間にはアトミックデータ型があり、AtomicIntegerを使用します。
// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
Atomic integerには必要な操作があるため、Counterクラスの代わりに使用できます。 興味深いことに、atomicintegerのすべてのメソッドはロックを使用しないため、プログラムの設計を容易にするデッドロックの可能性はありません。
同期されたキーワードを使用して重要なメソッドを同期すると、すべての問題が解決するはずですよね? 別の口座への入金、引き出し、送金が可能な2つの口座があると想像してみてください。 同時に、ある口座から別の口座に、またはその逆に送金したい場合はどうなりますか? 例を見てみましょう。
// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
このプログラムをラップトップで実行すると、通常はスタックします。 なぜこれが起こるのですか? よく見ると、送金するときに、同期された転送メソッドに入り、ソースアカウントのすべての同期されたメソッドへのアクセスをロックしてから、宛先アカウントをロックして、同期されたすべてのメソッドへのアクセスをロックしていることがわかります。
次のシナリオを想像してみてください。
- 最初のスレッド呼び出しは、ボブのアカウントからジョーのアカウントに転送されます
- 2番目のスレッド呼び出しは、JoeのアカウントからBobのアカウントに転送されます
- 2番目のスレッドはジョーのアカウントから金額を減らします
- 2番目のスレッドはBobのアカウントに金額を入金しますが、最初のスレッドが転送を完了するのを待ちます。
- 最初のスレッドはボブのアカウントから金額を減らします
- 最初のスレッドはJoeのアカウントに金額を入金しますが、2番目のスレッドが転送を完了するのを待ちます。
このシナリオでは、1つのスレッドが別のスレッドの転送が完了するのを待機しており、その逆も同様です。 それらは互いにスタックしていて、プログラムを続行できません。 これはデッドロックと呼ばれます。 デッドロックを回避するには、同じ順序でアカウントをロックする必要があります。 プログラムを修正するために、各アカウントに一意の番号を付けて、送金時に同じ順序でアカウントをロックできるようにします。
// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
そのような間違いの予測不可能性のために、それらは時々起こりますが、常にではなく、再現するのは困難です。 プログラムが予期しない動作をする場合、それは通常、偶発的な非決定論を導入する並行性によって引き起こされます。 偶発的な非決定論を回避するために、すべての絡み合いを考慮に入れるように事前に設計プログラムを実行する必要があります。
偶発的な非決定論を持つプログラムの例。
// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }
このプログラムには、偶発的な非決定論が含まれています。 コンテナに最後に入力された値が表示されます。
java NonDeterminism Slow
遅いスレッドは後で値を入力し、この値が出力されます(遅い)。 しかし、そうである必要はありません。 コンピュータが大量のCPUリソースを必要とする別のプログラムを同時に実行した場合はどうなりますか? プログラムではなくオペレーティングシステムによって制御されるため、最後に値を入力するのが遅いスレッドになるという保証はありません。 プログラムが一方のコンピューターで動作し、もう一方のコンピューターで動作が異なる状況が発生する可能性があります。 このようなエラーは見つけるのが難しく、開発者にとって頭痛の種になります。 これらすべての理由から、この並行性モデルを正しく実行することは非常に困難です。
機能的な方法
並列処理
関数型言語が使用している別のモデルを見てみましょう。 たとえば、Clojureを使用します。これは、ツールLeiningenを使用して解釈できます。 Clojureは、並行性を適切にサポートする非常に興味深い言語です。 以前の同時実行モデルは、共有された可変状態でした。 私たちが使用するクラスは、APIからは明らかではないため、私たちが知らないことを変更する非表示の状態を持つこともできます。 これまで見てきたように、注意しないと、このモデルは偶発的な非決定論とデッドロックを引き起こす可能性があります。 関数型言語のデータ型は変化しないため、変更されるリスクなしに安全に共有できます。 関数には、プロパティとその他のデータ型があります。 関数は、プログラムの実行中に作成し、パラメーターとして別の関数に渡すか、関数呼び出しの結果として返すことができます。
並行プログラミングの基本的なプリミティブは、futureとpromiseです。 Futureは、別のスレッドでコードのブロックを実行し、ブロックが実行されたときに入力されるfuture値のオブジェクトを返します。
; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))
このスクリプトを実行すると、出力は次のようになります。
Started A Started B Waiting for futures Finished A Finished B 10
この例では、独立して実行される2つのfutureブロックがあります。 プログラムは、まだ使用できないfutureオブジェクトから値を読み取るときにのみブロックします。 私たちの場合、将来のブロックの両方の結果が合計されるのを待っています。 動作は予測可能(決定論的)であり、共有された可変状態がないため、常に同じ結果が得られます。
並行性に使用されるもう1つのプリミティブは、promiseです。 Promiseは、一度値を入れることができるコンテナです。 promiseを読み取るとき、スレッドはpromiseの値が満たされるまで待機します。
; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)
この例では、futureは、値が保存されないことが約束されている限り、結果の出力を待機します。 2秒後、promiseには、将来のスレッドで出力される値42が格納されます。 promiseを使用すると、将来ではなくデッドロックが発生する可能性があるため、promiseを使用する場合は注意が必要です。
; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)
この例では、futureの結果とpromiseの結果を使用しています。 値の設定と読み取りの順序は、メインスレッドが将来のスレッドからの値を待機し、将来のスレッドがメインスレッドからの値を待機するというものです。 この動作は予測可能(決定論的)であり、プログラムが実行されるたびに再生されるため、エラーの検出と削除が容易になります。
futureを使用すると、futureの実行結果が必要になるまで、プログラムは演習を続行できます。 これにより、プログラムの実行が高速化されます。 将来的に複数のプロセッサがある場合は、予測可能な(決定論的な)動作を持つプログラムを並列実行できます(毎回同じ結果が得られます)。 そうすれば、コンピューターの能力をより有効に活用できます。
; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))
この例では、futureを使用すると、コンピューターの速度をより有効に活用できることがわかります。 合計すると2つのフィボナッチ数があります。 プログラムが結果を2回計算していることがわかります。1回目は1つのスレッドで順次計算され、2回目は2つのスレッドで並列に計算されます。 私のラップトップにはマルチコアプロセッサが搭載されているため、並列実行は順次計算の2倍の速度で動作します。
私のラップトップでこのスクリプトを実行した結果:
Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"
並行性
Clojureプログラミング言語で同時実行性と予測不可能性をサポートするには、他のスレッドが変更を確認できるように可変のデータ型を使用する必要があります。 最も単純な変数データ型はatomです。 Atomは、別の値に置き換えることができる値を常に持つコンテナーです。 この値は、新しい値を入力するか、古い値を取得してより頻繁に使用される新しい値を返す関数を呼び出すことで置き換えることができます。 アトムがロックなしで実装され、スレッドで安全に使用できることは興味深いことです。つまり、デッドロックに達することは不可能です。 内部的には、atomはjava.util.concurrent.AtomicReferenceライブラリを使用します。 アトムで実装された反例を見てみましょう。
; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
私のラップトップでのスクリプト実行の結果:
The counter is: 1000000 Number of attempts: 1680212
この例では、カウンターの値を含むアトムを使用します。 カウンターは(swap!counter inc)とともに増加します。 スワップ関数は次のように機能します。1。カウンター値を取得して保存します2.この値に対して、新しい値を計算する指定された関数を呼び出します3.新しい値を保存するために、古い値が変更されたかどうかをチェックするアトミック操作を使用します3a。 値が変更されていない場合は、新しい値3bを入力します。 その間に値が変更された場合は、手順1に進みます。その間に値が変更された場合は、関数を再度呼び出すことができることがわかります。 値は別のスレッドからのみ変更できます。 したがって、新しい値を計算する関数には副作用がなく、何度呼び出されてもかまわないことが重要です。 アトムの制限の1つは、変更を1つの値に同期することです。
; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)
このスクリプトを実行すると、次のようになります。
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525
この例では、より多くの原子を変更する方法を確認できます。 ある時点で、不整合が発生する可能性があります。 ある時点での2つのアカウントの合計は同じではありません。 複数の値の変更を調整する必要がある場合、2つの解決策があります。
- 1つの原子により多くの値を配置する
- 後で説明するように、参照とソフトウェアトランザクショナルメモリを使用します
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)
このスクリプトをコンピューターで実行すると、次のようになります。

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
この例では、地図を使用してより多くの価値を置くように調整が解決されています。 アカウントから送金する場合、金額が同じにならないように、その時点ですべての口座を変更します。
次の変数データ型はエージェントです。 エージェントは、値を変更する関数が別のスレッドで実行されるという点でのみアトムのように動作するため、変更が表示されるまでに時間がかかります。 したがって、エージェントの値を読み取るときは、エージェントの値を変更するすべての関数が実行されるまで待機する関数を呼び出す必要があります。 値を変更するアトム関数とは異なり、値は1回だけ呼び出されるため、副作用が発生する可能性があります。 このタイプは、1つの値を同期することもでき、デッドロックすることはできません。
; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
このスクリプトをラップトップで実行すると、次のようになります。
The counter is: 1000000 Number of attempts: 1000000
この例は、アトムを使用したカウンターの実装と同じです。 唯一の違いは、ここではすべてのエージェントの変更が完了するのを待ってから、awaitを使用して最終値を読み取ることです。
最後の変数データ型は参照です。 アトムとは異なり、参照は変更を複数の値に同期させることができます。 参照の各操作は、dosyncを使用するトランザクションで行う必要があります。 データを変更するこの方法は、ソフトウェアトランザクショナルメモリまたは略称STMと呼ばれます。 アカウントでの送金の例を見てみましょう。
; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)
このスクリプトを実行すると、次のようになります。
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000
興味深いことに、行われたトランザクションの数よりも多くの試行がありました。 これは、STMがロックを使用しないためです。したがって、競合が発生した場合(2つのスレッドが同じ値を変更しようとするなど)、トランザクションが再実行されます。 このため、トランザクションに副作用が発生することはありません。 トランザクション内で値が変化するエージェントが予測どおりに動作することがわかります。 エージェントの値を変更する関数は、トランザクションの回数だけ評価されます。 その理由は、エージェントがトランザクションを認識しているためです。 トランザクションに副作用が必要な場合は、エージェント内で機能させる必要があります。 このようにして、プログラムは予測可能な動作をします。 おそらく常にSTMを使用する必要があると思うかもしれませんが、アトムはSTMよりも単純で高速であるため、経験豊富なプログラマーはアトムを使用することがよくあります。 もちろん、それはそのようにプログラムを作ることが可能であるならばです。 副作用がある場合は、STMとエージェントを使用する以外に選択肢はありません。
アクターモデル
次の並行性のモデルは、アクターモデルです。 このモデルの原理は、現実の世界と似ています。 建物など、たくさんの人と何かを作るという取引をする場合、建設現場の人それぞれにそれぞれの役割があります。 群衆は監督者によって監督されています。 労働者が職場で負傷した場合、監督者は負傷した男性の仕事を利用可能な他の人に割り当てます。 必要に応じて、彼はその場所に新しい男を導くかもしれません。 このサイトには、同時に(同時に)作業を行うだけでなく、同期するために互いに話し合う人が増えています。 建設現場での作業をプログラムに入れると、すべての人が状態を持ち、独自のプロセスで実行される俳優になり、会話はメッセージに置き換えられます。 このモデルに基づく人気のあるプログラミング言語はErlangです。 この興味深い言語には、他のデータ型と同じプロパティを持つ不変のデータ型と関数があります。 関数は、プログラムの実行中に作成し、引数として別の関数に渡すか、関数呼び出しの結果として返すことができます。 Erlang仮想マシンを使用するElixir言語で例を示しますので、Erlangと同じプログラミングモデルを使用しますが、構文が異なります。 Elixirで最も重要な3つのプリミティブは、スポーン、送信、受信です。 spawnは新しいプロセスで機能を実行し、sendはメッセージをプロセスに送信し、receiveは現在のプロセスに送信される受信メッセージを送信します。
アクターモデルの最初の例は、同時にカウンターが増加します。 このモデルでプログラムを作成するには、アクターにカウンターの値を持たせ、カウンターの値を設定および取得するメッセージを受信し、同時にカウンターの値を増やす2人のアクターを作成する必要があります。
# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
この例を実行すると、次のようになります。
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827
最終的に、カウンターは516827であり、予想どおり1000000ではないことがわかります。 次回スクリプトを実行したときに、511010を受け取りました。この動作の理由は、カウンターが2つのメッセージを受信するためです。現在の値を取得することと新しい値を設定することです。 カウンタを増やすには、プログラムは現在の値を取得し、それを1増やして、増やした値を設定する必要があります。 2つのプロセスは、カウンタープロセスに送信されるメッセージを使用して、カウンターの値を同時に読み取りおよび書き込みます。 カウンタが受信するメッセージの順序は予測できず、プログラムはそれを制御できません。 このシナリオを想像することができます:
- カウンターバリューは115です
- プロセスAはカウンターの値を読み取ります(115)
- プロセスBはカウンターの値を読み取ります(115)
- プロセスBは値をローカルに増加させます(116)
- プロセスBは、増加した値をカウンターに設定します(116)
- プロセスAは、カウンターの値を増やします(116)
- プロセスAは、増加した値をカウンターに設定します(116)
- カウンターバリューは116
シナリオを見ると、2つのプロセスでカウンターが1増加し、最終的にカウンターは2ではなく1増加します。このような絡み合いは予測できない回数発生する可能性があるため、カウンターの値は予測できません。 この動作を防ぐには、増加操作を1つのメッセージで実行する必要があります。
# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
このスクリプトを実行すると、次のようになります。
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000
カウンターの値が正しいことがわかります。 予測可能な(決定論的な)動作の理由は、カウンターの値が1メッセージずつ増加するため、カウンターを増やすためのメッセージのシーケンスが最終的な値に影響を与えないためです。 アクターモデルを使用して、偶発的な予測不可能性(非決定論)を回避するために、メッセージがどのように絡み合い、メッセージとメッセージに対するアクションを注意深く設計できるかに注意を払う必要があります。
このモデルを使用して、2つのアカウント間でどのように送金できますか?
# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end
このスクリプトを実行すると、次のようになります。
Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
プログラムの予測可能な動作を提供するアカウントの価値を取得するために送金とメッセージ金額を転送するメッセージ転送を選択したため、送金は矛盾なく機能することがわかります。 私たちが送金を行うときはいつでも、いつでも合計金額は同じでなければなりません。
アクターモデルはロックを引き起こし、デッドロックを引き起こす可能性があるため、プログラムを設計する際には注意が必要です。 次のスクリプトは、ロックとデッドロックのシナリオをシミュレートする方法を示しています。
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
このスクリプトをラップトップで実行すると、次のようになります。
Locking A, B started Locking B, A started Waiting for locking to be done
出力から、AとBをロックするプロセスがスタックしていることがわかります。 これは、最初のプロセスが2番目のプロセスがBを解放するのを待機し、2番目のプロセスが最初のプロセスがAを解放するのを待機しているために発生します。 このロックを回避するには、順序を常に同じにするか、ロックを使用しないようにプログラムを設計する必要があります(つまり、特定のメッセージを待機しません)。 次のリストは常に最初にA、次にBをロックします。
# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished
And now, there is no longer a deadlock.
要約
As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.
Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.
The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.
Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.
I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.