Введение в параллельное программирование: руководство для начинающих

Опубликовано: 2022-03-11

Что такое параллельное программирование? Проще говоря, это когда вы делаете несколько вещей одновременно. Не путать с параллелизмом, параллелизм — это когда несколько последовательностей операций выполняются в перекрывающиеся периоды времени. В области программирования параллелизм — довольно сложная тема. Работа с такими конструкциями, как потоки и блокировки, и предотвращение таких проблем, как условия гонки и тупиковые ситуации, могут быть довольно громоздкими, что затрудняет написание параллельных программ. Благодаря параллелизму программы могут быть разработаны как независимые процессы, работающие вместе в определенной композиции. Такая структура может быть или не быть параллельной; однако создание такой структуры в вашей программе дает многочисленные преимущества.

Введение в параллельное программирование

В этой статье мы рассмотрим несколько различных моделей параллелизма, а также способы их достижения на различных языках программирования, предназначенных для параллелизма.

Общая модель изменяемого состояния

Давайте рассмотрим простой пример со счетчиком и двумя потоками, которые его увеличивают. Программа не должна быть слишком сложной. У нас есть объект, который содержит счетчик, который увеличивается с увеличением метода, и извлекает его с помощью метода get и двух потоков, которые увеличивают его.

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

Эта наивная программа не так наивна, как кажется на первый взгляд. Когда я запускаю эту программу несколько раз, я получаю разные результаты. Есть три значения после трех исполнений на моем ноутбуке.

 java Counting 553706 java Counting 547818 java Counting 613014

В чем причина такого непредсказуемого поведения? Программа увеличивает счетчик в одном месте, в методе увеличения, который использует команду counter++. Если мы посмотрим на байт-код команды, то увидим, что он состоит из нескольких частей:

  1. Чтение значения счетчика из памяти
  2. Увеличивайте ценность локально
  3. Сохранить значение счетчика в памяти

Теперь мы можем представить, что может пойти не так в этой последовательности. Если у нас есть два потока, которые независимо увеличивают счетчик, у нас может быть такой сценарий:

  1. Значение счетчика 115
  2. Первый поток считывает значение счетчика из памяти (115)
  3. Первый поток увеличивает значение локального счетчика (116)
  4. Второй поток считывает значение счетчика из памяти (115)
  5. Второй поток увеличивает значение локального счетчика (116)
  6. Второй поток сохраняет значение локального счетчика в памяти (116)
  7. Первый поток сохраняет значение локального счетчика в памяти (116)
  8. Значение счетчика 116

В этом сценарии два потока переплетаются, так что значение счетчика увеличивается на 1, но значение счетчика должно быть увеличено на 2, потому что каждый поток увеличивает его на 1. Различные переплетения потоков влияют на результат программы. Причина непредсказуемости программы в том, что не программа контролирует переплетение нитей, а операционная система. Каждый раз, когда программа выполняется, потоки могут переплетаться по-разному. Таким образом мы привнесли в программу случайную непредсказуемость (недетерминизм).

Чтобы исправить эту случайную непредсказуемость (недетерминизм), программа должна иметь контроль над переплетением нитей. Когда один поток находится в методе, другой поток не должен находиться в том же методе, пока первый не выйдет из него. Таким образом мы сериализуем доступ к методу увеличения.

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Другим решением является использование счетчика, который может увеличиваться атомарно, то есть операция не может быть разделена на несколько операций. Таким образом, нам не нужны блоки кода, которые необходимо синхронизировать. В Java есть атомарные типы данных в пространстве имен java.util.concurrent.atomic, и мы будем использовать AtomicInteger.

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Атомарное целое имеет необходимые нам операции, поэтому мы можем использовать его вместо класса Counter. Интересно отметить, что все методы atomicinteger не используют блокировку, так что нет возможности взаимоблокировок, что облегчает дизайн программы.

Использование синхронизированных ключевых слов для синхронизации критических методов должно решить все проблемы, верно? Давайте представим, что у нас есть две учетные записи, которые можно вносить, снимать и переводить на другую учетную запись. Что произойдет, если мы одновременно захотим перевести деньги с одного счета на другой и наоборот? Давайте посмотрим на пример.

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Когда я запускаю эту программу на своем ноутбуке, она обычно зависает. Почему это происходит? Если мы внимательно присмотримся, то увидим, что когда мы переводим деньги, мы используем синхронизированный метод перевода и блокируем доступ ко всем синхронизированным методам исходной учетной записи, а затем блокируем целевую учетную запись, которая блокирует доступ ко всем синхронизированным методам на ней.

Представьте себе следующий сценарий:

  1. Первый поток вызывает перевод со счета Боба на счет Джо.
  2. Второй поток вызывает перевод со счета Джо на счет Боба.
  3. Второй поток уменьшает сумму со счета Джо
  4. Второй поток переходит к внесению суммы на счет Боба, но ждет, пока первый поток завершит перевод.
  5. Первый поток уменьшает сумму со счета Боба
  6. Первый поток переходит к внесению суммы на счет Джо, но ждет, пока второй поток завершит перевод.

В этом сценарии один поток ожидает, пока другой поток завершит передачу, и наоборот. Они застряли друг с другом, и программа не может продолжаться. Это называется тупиком. Во избежание взаимоблокировки необходимо блокировать учетные записи в том же порядке. Чтобы исправить программу, мы дадим каждой учетной записи уникальный номер, чтобы мы могли блокировать учетные записи в том же порядке при переводе денег.

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Из-за непредсказуемости таких ошибок они иногда случаются, но не всегда и их сложно воспроизвести. Если программа ведет себя непредсказуемо, это обычно вызвано параллелизмом, который вносит случайный недетерминизм. Чтобы избежать случайной недетерминированности, мы должны заранее разработать программу, чтобы учесть все переплетения.

Пример программы со случайным недетерминизмом.

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

В этой программе присутствует случайный недетерминизм. Отобразится последнее введенное в контейнер значение.

 java NonDeterminism Slow

Более медленные потоки введут значение позже, и это значение будет напечатано (Slow). Но это не обязательно так. Что делать, если компьютер одновременно выполняет другую программу, которая требует много ресурсов процессора? У нас нет гарантии, что более медленный поток будет вводить значение последним, потому что он управляется операционной системой, а не программой. У нас могут быть ситуации, когда программа работает на одном компьютере, а на другом ведет себя по-разному. Такие ошибки трудно найти, и они вызывают головную боль у разработчиков. По всем этим причинам эту модель параллелизма очень трудно сделать правильно.

Функциональный путь

Параллелизм

Давайте посмотрим на другую модель, которую используют функциональные языки. Например, мы будем использовать Clojure, который можно интерпретировать с помощью инструмента Leiningen. Clojure — очень интересный язык с хорошей поддержкой параллелизма. Предыдущая модель параллелизма была с общим изменяемым состоянием. Классы, которые мы используем, также могут иметь скрытое состояние, которое мутирует, о чем мы не знаем, потому что это не очевидно из их API. Как мы видели, эта модель может вызвать случайную недетерминированность и тупиковые ситуации, если мы не будем осторожны. Функциональные языки имеют типы данных, которые не изменяются, поэтому ими можно безопасно делиться без риска их изменения. Функции имеют свойства, как и другие типы данных. Функции могут создаваться во время выполнения программы и передаваться в качестве параметра другой функции или возвращаться в результате вызова функции.

Базовые примитивы для параллельного программирования — это будущее и надежды. Future выполняет блок кода в другом потоке и возвращает объект для будущего значения, которое будет введено при выполнении блока.

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

Когда я выполняю этот скрипт, вывод:

 Started A Started B Waiting for futures Finished A Finished B 10

В этом примере у нас есть два будущих блока, которые выполняются независимо. Программа блокируется только при чтении значения из будущего объекта, который еще не доступен. В нашем случае ожидание суммирования обоих результатов будущих блоков. Поведение предсказуемо (детерминировано) и всегда будет давать один и тот же результат, потому что нет общего изменяемого состояния.

Еще один примитив, который используется для параллелизма, — это промис. Обещание — это контейнер, в который можно один раз поместить значение. При чтении промисов поток будет ждать, пока значение промиса не будет заполнено.

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

В этом примере будущее будет ждать печати результата до тех пор, пока обещание не будет сохранено. Через две секунды в промисе будет сохранено значение 42, которое будет напечатано в будущем потоке. Использование промисов может привести к взаимоблокировке, а не к будущему, поэтому будьте осторожны при использовании промисов.

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

В этом примере мы используем результат будущего и результат обещания. Порядок установки и чтения значений таков, что основной поток ожидает значения от будущего потока, а будущий поток ожидает значения от основного потока. Это поведение будет предсказуемым (детерминированным) и будет воспроизводиться каждый раз при выполнении программы, что упрощает поиск и устранение ошибок.

Использование будущего позволяет программе продолжать выполнение упражнения до тех пор, пока ей не понадобится результат выполнения будущего. Это приводит к более быстрому выполнению программы. Если у вас есть несколько процессоров с будущим, вы можете сделать параллельное выполнение программы, которая имеет предсказуемое (детерминированное) поведение (каждый раз дает один и тот же результат). Таким образом, мы лучше используем мощь компьютера.

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

В этом примере вы можете увидеть, как использование future позволяет лучше использовать скорость компьютера. У нас есть два числа Фибоначчи, которые складываются. Мы видим, что программа вычисляет результат дважды, первый раз последовательно в одном потоке, а второй раз параллельно в двух потоках. Поскольку у моего ноутбука многоядерный процессор, параллельное выполнение работает в два раза быстрее, чем последовательное вычисление.

Результат выполнения этого скрипта на моем ноутбуке:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

параллелизм

Для поддержки параллелизма и непредсказуемости в языке программирования Clojure мы должны использовать переменный тип данных, чтобы другие потоки могли видеть изменения. Самый простой тип переменных данных — это атом. Атом — это контейнер, в котором всегда есть значение, которое можно заменить другим значением. Значение можно заменить, введя новое значение или вызвав функцию, которая принимает старое значение и возвращает новое значение, которое используется чаще. Интересно, что атом реализован без блокировок и его безопасно использовать в потоках, а значит дойти до взаимоблокировки невозможно. Внутри атом использует библиотеку java.util.concurrent.AtomicReference. Давайте посмотрим на встречный пример, реализованный с помощью атома.

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Результат выполнения скрипта на моем ноутбуке:

 The counter is: 1000000 Number of attempts: 1680212

В этом примере мы используем атом, который содержит значение счетчика. Счетчик увеличивается на (swap! counter inc). Функция обмена работает следующим образом: 1. берет значение счетчика и сохраняет его 2. для этого значения вызывается заданная функция, которая вычисляет новое значение 3. для сохранения нового значения используется атомарная операция, которая проверяет, изменилось ли старое значение 3a. если значение не изменилось, вводится новое значение 3b. если за это время значение изменилось, то переходим к шагу 1. Мы видим, что функция может быть вызвана снова, если за это время значение изменилось. Значение может быть изменено только из другого потока. Поэтому важно, чтобы функция, вычисляющая новое значение, не имела побочных эффектов, чтобы не имело значения, будет ли она вызываться больше раз. Одним из ограничений атома является то, что он синхронизирует изменения одного значения.

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

Когда я выполняю этот скрипт, я получаю:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

В этом примере мы можем видеть, как мы меняем больше атомов. В какой-то момент может произойти несоответствие. Сумма двух счетов в какой-то момент времени не одинакова. Если нам нужно координировать изменения нескольких значений, есть два решения:

  1. Поместите больше значений в один атом
  2. Используйте ссылки и программную транзакционную память, как мы увидим позже.
 ; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

Когда я запускаю этот скрипт на своем компьютере, я получаю:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

В этом примере координация была решена, поэтому мы придаем большее значение использованию карты. Когда мы переводим деньги со счета, мы одновременно меняем все счета, чтобы никогда не случилось, что сумма денег не будет одинаковой.

Следующий тип переменных данных — агент. Агент ведет себя как атом только в том смысле, что функция, которая изменяет значение, выполняется в другом потоке, так что требуется некоторое время, чтобы изменение стало видимым. Поэтому при чтении значения агента необходимо вызвать функцию, которая ждет, пока не будут выполнены все функции, изменяющие значение агента. В отличие от атомов функция, меняющая значение, вызывается только один раз и поэтому может иметь побочные эффекты. Этот тип также может синхронизировать одно значение и не может блокироваться.

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Когда я запускаю этот скрипт на своем ноутбуке, я получаю:

 The counter is: 1000000 Number of attempts: 1000000

Этот пример аналогичен реализации счетчика с атомом. Единственное отличие состоит в том, что здесь мы ждем завершения всех изменений агента, прежде чем считывать окончательное значение с помощью await.

Последний тип переменных данных — это ссылки. В отличие от атомов, ссылки могут синхронизировать изменения нескольких значений. Каждая операция по ссылке должна быть в транзакции с использованием dosync. Такой способ изменения данных называется программной транзакционной памятью или сокращенно STM. Рассмотрим пример с переводом денег на счета.

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

Когда я запускаю этот скрипт, я получаю:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

Интересно, что попыток было больше, чем количество совершенных транзакций. Это связано с тем, что STM не использует блокировки, поэтому в случае конфликта (например, двух потоков, пытающихся изменить одно и то же значение) транзакция будет выполнена повторно. По этой причине транзакция не должна иметь побочных эффектов. Мы видим, что агент, значение которого изменяется в рамках транзакции, ведет себя предсказуемо. Функция, меняющая значение агента, будет оцениваться столько раз, сколько транзакций. Причина в том, что агент осведомлен о транзакциях. Если транзакция должна иметь побочные эффекты, они должны быть реализованы внутри агента. Таким образом, программа будет иметь предсказуемое поведение. Вы, вероятно, думаете, что всегда должны использовать STM, но опытные программисты часто будут использовать атомы, потому что атомы проще и быстрее, чем STM. Конечно, если можно сделать программу таким образом. Если у вас есть побочные эффекты, то нет другого выбора, кроме как использовать СТМ и агенты.

Актер Модель

Следующая модель параллелизма — это акторная модель. Принцип этой модели аналогичен реальному миру. Если мы договариваемся создать что-то со многими людьми, например здание, то у каждого человека на стройке своя роль. Толпа людей находится под присмотром надзирателя. Если работник получил травму на работе, руководитель назначит работу пострадавшего другим, имеющимся в наличии. При необходимости он может привести на сайт нового человека. На сайте у нас больше людей, которые делают работу одновременно (параллельно), но и общаются друг с другом для синхронизации. Если бы мы заложили в программу работу на стройке, то каждый человек был бы актором, имеющим состояние и исполняющим в своем процессе, а разговоры заменились бы сообщениями. Популярным языком программирования, основанным на этой модели, является Erlang. Этот интересный язык имеет неизменяемые типы данных и функции, обладающие теми же свойствами, что и другие типы данных. Функции могут быть созданы во время выполнения программы и переданы в качестве аргументов другой функции или возвращены в результате вызова функции. Я приведу примеры на языке Elixir, который использует виртуальную машину Erlang, поэтому у меня будет та же модель программирования, что и у Erlang, только с другим синтаксисом. Три самых важных примитива в Эликсире — это создание, отправка и получение. spawn выполняет функцию в новом процессе, send отправляет сообщение процессу, а Receive получает сообщения, отправленные текущему процессу.

Параллельно будет увеличиваться счетчик первого примера с акторной моделью. Чтобы сделать программу с этой моделью, необходимо, чтобы актор имел значение счетчика и получал сообщение для установки и получения значения счетчика, а также иметь двух акторов, которые будут одновременно увеличивать значение счетчика.

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Когда я выполняю этот пример, я получаю:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

Мы видим, что в итоге счетчик равен 516827, а не 1000000, как мы ожидали. При следующем запуске скрипта я получил 511010. Причина такого поведения в том, что счетчик получает два сообщения: получить текущее значение и установить новое значение. Для увеличения счетчика программе необходимо получить текущее значение, увеличить его на 1 и установить увеличенное значение. Два процесса одновременно читают и записывают значение счетчика, используя сообщение, отправляемое процессу счетчика. Порядок сообщений, которые будет получать счетчик, непредсказуем, и программа не может его контролировать. Мы можем представить такой сценарий:

  1. Значение счетчика 115
  2. Процесс A считывает значение счетчика (115)
  3. Процесс B считывает значение счетчика (115)
  4. Процесс B локально увеличивает значение (116)
  5. Процесс B устанавливает увеличенное значение счетчика (116)
  6. Процесс A увеличивает значение счетчика (116)
  7. Процесс A устанавливает увеличенное значение счетчика (116)
  8. Значение счетчика 116

Если мы посмотрим на сценарий, два процесса увеличивают счетчик на 1, и счетчик увеличивается в итоге на 1, а не на 2. Такие переплетения могут происходить непредсказуемое количество раз, и поэтому значение счетчика непредсказуемо. Чтобы предотвратить такое поведение, операция увеличения должна выполняться одним сообщением.

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Запустив этот скрипт, я получаю:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

Мы видим, что счетчик имеет правильное значение. Причина предсказуемого (детерминированного) поведения заключается в том, что значение счетчика увеличивается на одно сообщение, так что последовательность сообщений для увеличения счетчика не повлияет на его конечное значение. Работая с акторной моделью, мы должны обращать внимание на то, как сообщения могут переплетаться, и тщательно разрабатывать сообщения и действия над сообщениями, чтобы избежать случайной непредсказуемости (недетерминизма).

Как мы можем перевести деньги между двумя счетами с этой моделью?

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

Когда я запускаю этот скрипт, я получаю:

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

Мы видим, что перевод денег работает без несоответствий, потому что мы выбрали передачу сообщения для перевода денег и суммы сообщения, чтобы получить значение счетов, что дает нам предсказуемое поведение программы. Всякий раз, когда мы делаем перевод денег, общая сумма денег в любое время должна быть одинаковой.

Модель актора может вызвать блокировку и, следовательно, взаимоблокировку, поэтому будьте осторожны при разработке программы. В следующем сценарии показано, как можно имитировать сценарий блокировки и взаимоблокировки.

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

Заворачивать

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial