並發編程簡介:初學者指南

已發表: 2022-03-11

什麼是並發編程? 簡單地說,就是你同時做不止一件事。 不要與並行性混淆,並發是指在重疊的時間段內運行多個操作序列。 在編程領域,並發是一個相當複雜的主題。 處理諸如線程和鎖之類的結構並避免諸如競爭條件和死鎖之類的問題可能非常麻煩,使得併發程序難以編寫。 通過並發,程序可以設計為在特定組合中一起工作的獨立進程。 這種結構可以平行也可以不平行; 但是,在您的程序中實現這樣的結構提供了許多優勢。

並發編程簡介

在本文中,我們將了解一些不同的並發模型,以及如何在為並發設計的各種編程語言中實現它們。

共享可變狀態模型

讓我們看一個簡單的例子,它有一個計數器和兩個增加它的線程。 程序不應該太複雜。 我們有一個對象,它包含一個隨著方法增加而增加的計數器,並使用 get 方法和兩個增加它的線程來檢索它。

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

這個幼稚的程序並不像乍一看那樣幼稚。 當我多次運行這個程序時,我會得到不同的結果。 在我的筆記本電腦上執行 3 次後有三個值。

 java Counting 553706 java Counting 547818 java Counting 613014

這種不可預測的行為的原因是什麼? 程序在一個地方增加計數器,在使用命令 counter++ 的方法增加中。 如果我們查看命令字節碼,我們會發現它由幾個部分組成:

  1. 從內存中讀取計數器值
  2. 在當地增加價值
  3. 將計數器值存儲在內存中

現在我們可以想像在這個序列中會出現什麼問題。 如果我們有兩個獨立增加計數器的線程,那麼我們可能會遇到這種情況:

  1. 計數器值為 115
  2. 第一個線程從內存中讀取計數器的值 (115)
  3. 第一個線程增加本地計數器值 (116)
  4. 第二個線程從內存中讀取計數器的值 (115)
  5. 第二個線程增加本地計數器值 (116)
  6. 第二個線程將本地計數器值保存到內存 (116)
  7. 第一個線程將本地計數器值保存到內存 (116)
  8. 計數器的值為 116

在這種情況下,兩個線程交織在一起,因此計數器值增加1,但是計數器值應該增加2,因為每個線程都增加1。不同的線程交織影響程序的結果。 程序不可預測的原因是程序無法控制線程交織,而是操作系統。 每次執行程序時,線程都可以以不同的方式交織在一起。 通過這種方式,我們向程序引入了意外的不可預測性(非確定性)。

為了解決這種意外的不可預測性(非確定性),程序必須控制線程交織。 當一個線程在方法中增加時,另一個線程不能在同一個方法中,直到第一個線程出來。 這樣我們就可以增加對方法的序列化訪問。

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

另一種解決方案是使用可以原子增加的計數器,這意味著操作不能分成多個操作。 這樣,我們就不需要有需要同步的代碼塊了。 Java 在 java.util.concurrent.atomic 命名空間中有原子數據類型,我們將使用 AtomicInteger。

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

原子整數具有我們需要的操作,因此我們可以使用它來代替 Counter 類。 有趣的是,atomicinteger的所有方法都沒有使用鎖,這樣就沒有死鎖的可能,方便了程序的設計。

使用 synchronized 關鍵字來同步關鍵方法應該可以解決所有問題,對吧? 假設我們有兩個賬戶可以存款、取款和轉賬到另一個賬戶。 如果同時我們想將錢從一個賬戶轉移到另一個賬戶,反之亦然,會發生什麼? 讓我們看一個例子。

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

當我在筆記本電腦上運行這個程序時,它通常會卡住。 為什麼會這樣? 如果我們仔細觀察,我們可以看到,當我們轉賬時,我們正在進入同步的轉賬方法,並鎖定源賬戶上所有同步方法的訪問,然後鎖定目標賬戶,鎖定目標賬戶上所有同步方法的訪問。

想像以下場景:

  1. 第一個線程調用將 Bob 的帳戶轉移到 Joe 的帳戶
  2. 第二個線程調用將喬的帳戶轉移到鮑勃的帳戶
  3. 第二個線程減少了喬賬戶的金額
  4. 第二個線程將金額存入 Bob 的賬戶,但等待第一個線程完成轉賬。
  5. 第一個線程從 Bob 的帳戶中減少金額
  6. 第一個線程將金額存入 Joe 的帳戶,但等待第二個線程完成轉賬。

在這種情況下,一個線程正在等待另一個線程完成傳輸,反之亦然。 它們相互卡住,程序無法繼續。 這稱為死鎖。 為了避免死鎖,有必要以相同的順序鎖定帳戶。 為了修復該程序,我們將為每個帳戶分配一個唯一編號,以便我們在轉賬時可以按相同順序鎖定帳戶。

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

由於此類錯誤的不可預測性,它們有時會發生,但並非總是如此,並且難以重現。 如果程序的行為不可預測,通常是由並發引起的,它引入了意外的非確定性。 為避免意外的非確定性,我們應該提前設計程序以考慮所有相互交織的因素。

具有意外非確定性的程序示例。

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

該程序具有偶然的非確定性。 將顯示在容器中輸入的最後一個值。

 java NonDeterminism Slow

較慢的線程稍後會輸入該值,並且將打印該值(Slow)。 但情況並非如此。 如果計算機同時執行另一個需要大量 CPU 資源的程序怎麼辦? 我們不能保證最後輸入值的是速度較慢的線程,因為它是由操作系統而不是程序控制的。 我們可能會遇到程序在一台計算機上運行而在另一台計算機上表現不同的情況。 這樣的錯誤很難找到,它們給開發人員帶來了麻煩。 由於所有這些原因,這種並發模型很難正確執行。

功能方式

並行性

讓我們看一下函數式語言正在使用的另一個模型。 例如,我們將使用 Clojure,它可以使用 Leiningen 工具進行解釋。 Clojure 是一種非常有趣的語言,它對並發的支持很好。 以前的並發模型是共享可變狀態。 我們使用的類也可能有一個隱藏狀態,它會發生我們不知道的變異,因為從它們的 API 中看不到這一點。 正如我們所見,如果我們不小心,這種模型可能會導致意外的非確定性和死鎖。 函數式語言具有不會發生變化的數據類型,因此可以安全地共享它,而無需擔心它們會發生變化。 函數具有屬性以及其他數據類型。 函數可以在程序執行期間創建並作為參數傳遞給另一個函數或作為函數調用的結果返回。

並發編程的基本原語是未來和承諾。 Future 在另一個線程中執行一個代碼塊,並返回一個對象,該對象表示執行該塊時將輸入的未來值。

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

當我執行這個腳本時,輸出是:

 Started A Started B Waiting for futures Finished A Finished B 10

在這個例子中,我們有兩個獨立執行的未來塊。 僅當從未來對像中讀取尚不可用的值時,程序才會阻塞。 在我們的例子中,等待對未來塊的兩個結果求和。 行為是可預測的(確定性的)並且總是會給出相同的結果,因為沒有共享的可變狀態。

另一個用於並發的原語是 promise。 Promise 是一個容器,可以在其中放置一次值。 讀取 Promise 時,線程將等待直到 Promise 的值被填充。

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

在這個例子中,只要 promise not be saved value,future 就會等待打印結果。 兩秒後,promise 中將存儲值 42 以在將來的線程中打印。 使用 Promise 會導致死鎖而不是未來,所以在使用 Promise 時要小心。

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

在這個例子中,我們使用了未來的結果和承諾的結果。 設置和讀取值的順序是主線程在等待來自未來線程的值,而未來線程在等待來自主線程的值。 這種行為將是可預測的(確定性的),並且會在每次程序執行時播放,從而更容易發現和消除錯誤。

使用 future 允許程序繼續練習,直到它需要 future 的執行結果。 這導致更快的程序執行。 如果未來有多個處理器,則可以並行執行具有可預測(確定性)行為的程序(每次都給出相同的結果)。 這樣我們就能更好地利用計算機的力量。

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

在這個例子中,您可以看到使用future 可以如何更好地利用計算機的速度。 我們有兩個斐波那契數相加。 我們可以看到程序計算了兩次結果,第一次是單線程順序計算的,第二次是兩個線程並行計算的。 由於我的筆記本電腦有一個多核處理器,並行執行的速度是順序計算的兩倍。

在我的筆記本電腦上執行此腳本的結果:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

並發

為了支持 Clojure 編程語言中的並發性和不可預測性,我們必須使用可變的數據類型,以便其他線程可以看到更改。 最簡單的變量數據類型是原子。 Atom 是一個容器,它始終具有可以被另一個值替換的值。 可以通過輸入一個新值或調用一個接受舊值並返回更常用的新值的函數來替換該值。 有趣的是,atom 是在沒有鎖定的情況下實現的,並且在線程中使用是安全的,這意味著不可能達到死鎖。 在內部,atom 使用 java.util.concurrent.AtomicReference 庫。 讓我們看一個用 atom 實現的反例。

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

在我的筆記本電腦上執行腳本的結果:

 The counter is: 1000000 Number of attempts: 1680212

在這個例子中,我們使用了一個包含計數器值的原子。 計數器隨著 (swap!counter inc) 的增加而增加。 交換函數的工作原理如下: 1. 獲取計數器值並保存它 2. 為此值調用計算新值的給定函數 3. 為了保存新值,它使用原子操作檢查舊值是否已更改 3a。 如果該值沒有改變,則輸入一個新值 3b。 如果同時改變了值,則轉到步驟 1 我們看到如果同時改變了值,則可以再次調用該函數。 該值只能從另一個線程更改。 因此,計算新值的函數必須沒有副作用,這樣即使它被調用更多次也沒關係。 atom 的一個限制是它將更改同步到一個值。

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

當我執行這個腳本時,我得到:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

在這個例子中,我們可以看到我們如何改變更多的原子。 在某一時刻,可能會發生不一致。 兩個賬戶的總和在某個時候是不一樣的。 如果我們必須協調多個值的變化,有兩種解決方案:

  1. 在一個原子中放置更多值
  2. 使用引用和軟件事務內存,我們將在後面看到
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

當我在我的計算機上運行這個腳本時,我得到:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

在示例中,協調已得到解決,因此我們使用地圖賦予了更多價值。 當我們從賬戶轉賬時,我們會同時更改所有賬戶,這樣就永遠不會發生金額不一樣的情況。

下一個變量數據類型是代理。 代理的行為類似於原子,只是因為更改值的函數在不同的線程中執行,因此更改需要一些時間才能變得可見。 因此,在讀取代理的值時,有必要調用一個函數,該函數等待所有更改代理值的函數都執行完畢。 與改變值的原子函數不同,它只調用一次,因此會產生副作用。 這種類型也可以同步一個值,不會死鎖。

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

當我在筆記本電腦上運行此腳本時,我得到:

 The counter is: 1000000 Number of attempts: 1000000

這個例子與使用原子的計數器的實現相同。 唯一的區別是,在使用 await 讀取最終值之前,我們正在等待所有代理更改完成。

最後一個變量數據類型是引用。 與原子不同,引用可以同步對多個值的更改。 引用上的每個操作都應該在使用 dosync 的事務中。 這種改變數據的方式稱為軟件事務存儲器或簡稱 STM。 讓我們看一個賬戶轉賬的例子。

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

當我運行這個腳本時,我得到:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

有趣的是,嘗試的次數比交易的次數還要多。 這是因為 STM 不使用鎖,所以如果發生衝突(如兩個線程試圖更改相同的值),事務將重新執行。 因此,交易不應該有副作用。 我們可以看到,在交易中價值發生變化的代理行為是可預測的。 更改代理值的函數將被評估的次數與事務的次數一樣多。 原因是代理具有交易意識。 如果事務必須有副作用,它們應該在代理內部發揮作用。 這樣,程序將具有可預測的行為。 您可能認為您應該始終使用 STM,但有經驗的程序員會經常使用 atom,因為 atom 比 STM 更簡單、更快。 當然,那是如果有可能以這種方式製作程序。 如果您有副作用,那麼除了使用 STM 和代理之外別無選擇。

演員模型

下面的並發模型是一個actor模型。 該模型的原理與現實世界相似。 如果我們與許多人達成協議,例如建造一座建築物,那麼建築工地的每個人都有自己的角色。 一群人由主管監督。 如果工人在工作中受傷,主管會將受傷男子的工作分配給其他有空的人。 如有必要,他可能會帶領一個新人到該站點。 在網站上,我們有更多的人同時(同時)做工作,但也互相交談以同步。 如果我們把建築工地的工作放到程序中,那麼每個人都是一個有狀態的演員,在自己的流程中執行,談話將被消息取代。 基於此模型的流行編程語言是 Erlang。 這種有趣的語言具有不可變的數據類型和與其他數據類型具有相同屬性的函數。 函數可以在程序執行期間創建並作為參數傳遞給另一個函數或作為函數調用的結果返回。 我將給出使用 Erlang 虛擬機的 Elixir 語言示例,因此我將擁有與 Erlang 相同的編程模型,只是語法不同。 Elixir 中三個最重要的原語是 spawn、send 和 receive。 spawn 在新進程中執行函數,send 將消息發送到進程,receive 接收發送到當前進程的消息。

使用actor模型的第一個示例將同時增加計數器。 用這個模型做一個程序,需要讓一個actor擁有計數器的值並接收消息來設置和檢索計數器的值,並且有兩個actor同時增加計數器的值。

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

當我執行這個例子時,我得到:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

我們可以看到最終計數器是 516827 而不是我們預期的 1000000。 下次運行腳本時,收到 511010。出現這種行為的原因是計數器收到兩條消息:檢索當前值和設置新值。 要增加計數器,程序需要獲取當前值,將其增加 1 並設置增加的值。 兩個進程使用發送給計數器進程的消息同時讀取和寫入計數器的值。 計數器將接收到的消息的順序是不可預測的,程序無法控制它。 我們可以想像這樣的場景:

  1. 計數器值為 115
  2. 進程 A 讀取計數器的值 (115)
  3. 進程 B 讀取計數器的值 (115)
  4. 過程 B 在本地增加值 (116)
  5. 進程 B 將增加的值設置為計數器 (116)
  6. 進程 A 增加計數器的值 (116)
  7. 進程 A 將增加的值設置為計數器 (116)
  8. 計數器值為 116

如果我們看一下場景,兩個進程將計數器增加 1,最後計數器增加 1 而不是 2。這種交織可能會發生不可預測的次數,因此計數器的值是不可預測的。 為了防止這種行為,增加操作必須由一條消息完成。

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

通過運行這個腳本,我得到:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

我們可以看到計數器的值是正確的。 可預測(確定性)行為的原因是計數器的值增加一個消息,因此增加計數器的消息序列不會影響其最終值。 使用 Actor 模型時,我們必須注意消息如何相互交織,並仔細設計消息和消息上的操作,以避免意外的不可預測性(非確定性)。

我們如何使用這種模型在兩個賬戶之間轉賬?

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

當我運行這個腳本時,我得到:

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

我們可以看到匯款沒有不一致的情況,因為我們選擇了消息轉移來轉移資金和消息金額以獲得賬戶的價值,這使我們可以預測程序的行為。 每當我們進行轉賬時,任何時候的總金額都應該是一樣的。

Actor 模型會導致鎖,從而導致死鎖,因此在設計程序時要小心。 以下腳本顯示瞭如何模擬鎖定和死鎖場景。

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

包起來

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial