并发编程简介:初学者指南
已发表: 2022-03-11什么是并发编程? 简单地说,就是你同时做不止一件事。 不要与并行性混淆,并发是指在重叠的时间段内运行多个操作序列。 在编程领域,并发是一个相当复杂的主题。 处理诸如线程和锁之类的结构并避免诸如竞争条件和死锁之类的问题可能非常麻烦,使得并发程序难以编写。 通过并发,程序可以设计为在特定组合中一起工作的独立进程。 这种结构可以平行也可以不平行; 但是,在您的程序中实现这样的结构提供了许多优势。
在本文中,我们将了解一些不同的并发模型,以及如何在为并发设计的各种编程语言中实现它们。
共享可变状态模型
让我们看一个简单的例子,它有一个计数器和两个增加它的线程。 程序不应该太复杂。 我们有一个对象,它包含一个随着方法增加而增加的计数器,并使用 get 方法和两个增加它的线程来检索它。
// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }
这个幼稚的程序并不像乍一看那样幼稚。 当我多次运行这个程序时,我会得到不同的结果。 在我的笔记本电脑上执行 3 次后有三个值。
java Counting 553706 java Counting 547818 java Counting 613014
这种不可预测的行为的原因是什么? 程序在一个地方增加计数器,在使用命令 counter++ 的方法增加中。 如果我们查看命令字节码,我们会发现它由几个部分组成:
- 从内存中读取计数器值
- 在当地增加价值
- 将计数器值存储在内存中
现在我们可以想象在这个序列中会出现什么问题。 如果我们有两个独立增加计数器的线程,那么我们可能会遇到这种情况:
- 计数器值为 115
- 第一个线程从内存中读取计数器的值 (115)
- 第一个线程增加本地计数器值 (116)
- 第二个线程从内存中读取计数器的值 (115)
- 第二个线程增加本地计数器值 (116)
- 第二个线程将本地计数器值保存到内存 (116)
- 第一个线程将本地计数器值保存到内存 (116)
- 计数器的值为 116
在这种情况下,两个线程交织在一起,因此计数器值增加1,但是计数器值应该增加2,因为每个线程都增加1。不同的线程交织影响程序的结果。 程序不可预测的原因是程序无法控制线程交织,而是操作系统。 每次执行程序时,线程都可以以不同的方式交织在一起。 通过这种方式,我们向程序引入了意外的不可预测性(非确定性)。
为了解决这种意外的不可预测性(非确定性),程序必须控制线程交织。 当一个线程在方法中增加时,另一个线程不能在同一个方法中,直到第一个线程出来。 这样我们就可以增加对方法的序列化访问。
// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
另一种解决方案是使用可以原子增加的计数器,这意味着操作不能分成多个操作。 这样,我们就不需要有需要同步的代码块了。 Java 在 java.util.concurrent.atomic 命名空间中有原子数据类型,我们将使用 AtomicInteger。
// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
原子整数具有我们需要的操作,因此我们可以使用它来代替 Counter 类。 有趣的是,atomicinteger的所有方法都没有使用锁,这样就没有死锁的可能,方便了程序的设计。
使用 synchronized 关键字来同步关键方法应该可以解决所有问题,对吧? 假设我们有两个账户可以存款、取款和转账到另一个账户。 如果同时我们想将钱从一个账户转移到另一个账户,反之亦然,会发生什么? 让我们看一个例子。
// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
当我在笔记本电脑上运行这个程序时,它通常会卡住。 为什么会这样? 如果我们仔细观察,我们可以看到,当我们转账时,我们正在进入同步的转账方法,并锁定源账户上所有同步方法的访问,然后锁定目标账户,锁定目标账户上所有同步方法的访问。
想象以下场景:
- 第一个线程调用将 Bob 的帐户转移到 Joe 的帐户
- 第二个线程调用将乔的帐户转移到鲍勃的帐户
- 第二个线程减少了乔账户的金额
- 第二个线程将金额存入 Bob 的账户,但等待第一个线程完成转账。
- 第一个线程从 Bob 的帐户中减少金额
- 第一个线程将金额存入 Joe 的帐户,但等待第二个线程完成转账。
在这种情况下,一个线程正在等待另一个线程完成传输,反之亦然。 它们相互卡住,程序无法继续。 这称为死锁。 为了避免死锁,有必要以相同的顺序锁定帐户。 为了修复该程序,我们将为每个帐户分配一个唯一编号,以便我们在转账时可以按相同顺序锁定帐户。
// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
由于此类错误的不可预测性,它们有时会发生,但并非总是如此,并且难以重现。 如果程序的行为不可预测,通常是由并发引起的,它引入了意外的非确定性。 为避免意外的非确定性,我们应该提前设计程序以考虑所有相互交织的因素。
具有意外非确定性的程序示例。
// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }
该程序具有偶然的非确定性。 将显示在容器中输入的最后一个值。
java NonDeterminism Slow
较慢的线程稍后会输入该值,并且将打印该值(Slow)。 但情况并非如此。 如果计算机同时执行另一个需要大量 CPU 资源的程序怎么办? 我们不能保证最后输入值的是速度较慢的线程,因为它是由操作系统而不是程序控制的。 我们可能会遇到程序在一台计算机上运行而在另一台计算机上表现不同的情况。 这样的错误很难找到,它们给开发人员带来了麻烦。 由于所有这些原因,这种并发模型很难正确执行。
功能方式
并行性
让我们看一下函数式语言正在使用的另一个模型。 例如,我们将使用 Clojure,它可以使用 Leiningen 工具进行解释。 Clojure 是一种非常有趣的语言,它对并发的支持很好。 以前的并发模型是共享可变状态。 我们使用的类也可能有一个隐藏状态,它会发生我们不知道的变异,因为从它们的 API 中看不到这一点。 正如我们所见,如果我们不小心,这种模型可能会导致意外的非确定性和死锁。 函数式语言具有不会发生变化的数据类型,因此可以安全地共享它,而无需担心它们会发生变化。 函数具有属性以及其他数据类型。 函数可以在程序执行期间创建并作为参数传递给另一个函数或作为函数调用的结果返回。
并发编程的基本原语是未来和承诺。 Future 在另一个线程中执行一个代码块,并返回一个对象,该对象表示执行该块时将输入的未来值。
; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))
当我执行这个脚本时,输出是:
Started A Started B Waiting for futures Finished A Finished B 10
在这个例子中,我们有两个独立执行的未来块。 仅当从未来对象中读取尚不可用的值时,程序才会阻塞。 在我们的例子中,等待对未来块的两个结果求和。 行为是可预测的(确定性的)并且总是会给出相同的结果,因为没有共享的可变状态。
另一个用于并发的原语是 promise。 Promise 是一个容器,可以在其中放置一次值。 读取 Promise 时,线程将等待直到 Promise 的值被填充。
; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)
在这个例子中,只要 promise not be saved value,future 就会等待打印结果。 两秒后,promise 中将存储值 42 以在将来的线程中打印。 使用 Promise 会导致死锁而不是未来,所以在使用 Promise 时要小心。
; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)
在这个例子中,我们使用了未来的结果和承诺的结果。 设置和读取值的顺序是主线程在等待来自未来线程的值,而未来线程在等待来自主线程的值。 这种行为将是可预测的(确定性的),并且会在每次程序执行时播放,从而更容易发现和消除错误。
使用 future 允许程序继续练习,直到它需要 future 的执行结果。 这导致更快的程序执行。 如果未来有多个处理器,则可以并行执行具有可预测(确定性)行为的程序(每次都给出相同的结果)。 这样我们就能更好地利用计算机的力量。
; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))
在这个例子中,您可以看到使用future 可以如何更好地利用计算机的速度。 我们有两个斐波那契数相加。 我们可以看到程序计算了两次结果,第一次是单线程顺序计算的,第二次是两个线程并行计算的。 由于我的笔记本电脑有一个多核处理器,并行执行的速度是顺序计算的两倍。
在我的笔记本电脑上执行此脚本的结果:
Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"
并发
为了支持 Clojure 编程语言中的并发性和不可预测性,我们必须使用可变的数据类型,以便其他线程可以看到更改。 最简单的变量数据类型是原子。 Atom 是一个容器,它始终具有可以被另一个值替换的值。 可以通过输入一个新值或调用一个接受旧值并返回更常用的新值的函数来替换该值。 有趣的是,atom 是在没有锁定的情况下实现的,并且在线程中使用是安全的,这意味着不可能达到死锁。 在内部,atom 使用 java.util.concurrent.AtomicReference 库。 让我们看一个用 atom 实现的反例。
; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
在我的笔记本电脑上执行脚本的结果:
The counter is: 1000000 Number of attempts: 1680212
在这个例子中,我们使用了一个包含计数器值的原子。 计数器随着 (swap!counter inc) 的增加而增加。 交换函数的工作原理如下: 1. 获取计数器值并保存它 2. 为此值调用计算新值的给定函数 3. 为了保存新值,它使用原子操作检查旧值是否已更改 3a。 如果该值没有改变,则输入一个新值 3b。 如果同时改变了值,则转到步骤 1 我们看到如果同时改变了值,则可以再次调用该函数。 该值只能从另一个线程更改。 因此,计算新值的函数必须没有副作用,这样即使它被调用更多次也没关系。 atom 的一个限制是它将更改同步到一个值。
; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)
当我执行这个脚本时,我得到:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525
在这个例子中,我们可以看到我们如何改变更多的原子。 在某一时刻,可能会发生不一致。 两个账户的总和在某个时候是不一样的。 如果我们必须协调多个值的变化,有两种解决方案:
- 在一个原子中放置更多值
- 使用引用和软件事务内存,我们将在后面看到
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)
当我在我的计算机上运行这个脚本时,我得到:

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
在示例中,协调已得到解决,因此我们使用地图赋予了更多价值。 当我们从账户转账时,我们会同时更改所有账户,这样就永远不会发生金额不一样的情况。
下一个变量数据类型是代理。 代理的行为类似于原子,只是因为更改值的函数在不同的线程中执行,因此更改需要一些时间才能变得可见。 因此,在读取代理的值时,有必要调用一个函数,该函数等待所有更改代理值的函数都执行完毕。 与改变值的原子函数不同,它只调用一次,因此会产生副作用。 这种类型也可以同步一个值,不会死锁。
; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
当我在笔记本电脑上运行此脚本时,我得到:
The counter is: 1000000 Number of attempts: 1000000
这个例子与使用原子的计数器的实现相同。 唯一的区别是,在使用 await 读取最终值之前,我们正在等待所有代理更改完成。
最后一个变量数据类型是引用。 与原子不同,引用可以同步对多个值的更改。 引用上的每个操作都应该在使用 dosync 的事务中。 这种改变数据的方式称为软件事务存储器或简称 STM。 让我们看一个账户转账的例子。
; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)
当我运行这个脚本时,我得到:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000
有趣的是,尝试的次数比交易的次数还要多。 这是因为 STM 不使用锁,所以如果发生冲突(如两个线程试图更改相同的值),事务将重新执行。 因此,交易不应该有副作用。 我们可以看到,在交易中价值发生变化的代理行为是可预测的。 更改代理值的函数将被评估的次数与事务的次数一样多。 原因是代理具有交易意识。 如果事务必须有副作用,它们应该在代理内部发挥作用。 这样,程序将具有可预测的行为。 您可能认为您应该始终使用 STM,但有经验的程序员会经常使用 atom,因为 atom 比 STM 更简单、更快。 当然,那是如果有可能以这种方式制作程序。 如果您有副作用,那么除了使用 STM 和代理之外别无选择。
演员模型
下面的并发模型是一个actor模型。 该模型的原理与现实世界相似。 如果我们与许多人达成协议,例如建造一座建筑物,那么建筑工地的每个人都有自己的角色。 一群人由主管监督。 如果工人在工作中受伤,主管会将受伤男子的工作分配给其他有空的人。 如有必要,他可能会带领一个新人到该站点。 在网站上,我们有更多的人同时(同时)做工作,但也互相交谈以同步。 如果我们把建筑工地的工作放到程序中,那么每个人都是一个有状态的演员,在自己的流程中执行,谈话将被消息取代。 基于此模型的流行编程语言是 Erlang。 这种有趣的语言具有不可变的数据类型和与其他数据类型具有相同属性的函数。 函数可以在程序执行期间创建并作为参数传递给另一个函数或作为函数调用的结果返回。 我将给出使用 Erlang 虚拟机的 Elixir 语言示例,因此我将拥有与 Erlang 相同的编程模型,只是语法不同。 Elixir 中三个最重要的原语是 spawn、send 和 receive。 spawn 在新进程中执行函数,send 将消息发送到进程,receive 接收发送到当前进程的消息。
使用actor模型的第一个示例将同时增加计数器。 用这个模型做一个程序,需要让一个actor拥有计数器的值并接收消息来设置和检索计数器的值,并且有两个actor同时增加计数器的值。
# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
当我执行这个例子时,我得到:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827
我们可以看到最终计数器是 516827 而不是我们预期的 1000000。 下次运行脚本时,收到 511010。出现这种行为的原因是计数器收到两条消息:检索当前值和设置新值。 要增加计数器,程序需要获取当前值,将其增加 1 并设置增加的值。 两个进程使用发送给计数器进程的消息同时读取和写入计数器的值。 计数器将接收到的消息的顺序是不可预测的,程序无法控制它。 我们可以想象这样的场景:
- 计数器值为 115
- 进程 A 读取计数器的值 (115)
- 进程 B 读取计数器的值 (115)
- 过程 B 在本地增加值 (116)
- 进程 B 将增加的值设置为计数器 (116)
- 进程 A 增加计数器的值 (116)
- 进程 A 将增加的值设置为计数器 (116)
- 计数器值为 116
如果我们看一下场景,两个进程将计数器增加 1,最后计数器增加 1 而不是 2。这种交织可能会发生不可预测的次数,因此计数器的值是不可预测的。 为了防止这种行为,增加操作必须由一条消息完成。
# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
通过运行这个脚本,我得到:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000
我们可以看到计数器的值是正确的。 可预测(确定性)行为的原因是计数器的值增加一个消息,因此增加计数器的消息序列不会影响其最终值。 使用 Actor 模型时,我们必须注意消息如何相互交织,并仔细设计消息和消息上的操作,以避免意外的不可预测性(非确定性)。
我们如何使用这种模型在两个账户之间转账?
# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end
当我运行这个脚本时,我得到:
Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
我们可以看到汇款没有不一致的情况,因为我们选择了消息转移来转移资金和消息金额以获得账户的价值,这使我们可以预测程序的行为。 每当我们进行转账时,任何时候的总金额都应该是一样的。
Actor 模型会导致锁,从而导致死锁,因此在设计程序时要小心。 以下脚本显示了如何模拟锁定和死锁场景。
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking B, A started Waiting for locking to be done
From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.
# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished
And now, there is no longer a deadlock.
包起来
As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.
Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.
The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.
Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.
I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.