Wprowadzenie do programowania współbieżnego: przewodnik dla początkujących
Opublikowany: 2022-03-11Co to jest programowanie współbieżne? Mówiąc prosto, dzieje się tak, gdy robisz więcej niż jedną rzecz w tym samym czasie. Nie mylić z równoległością, współbieżność ma miejsce, gdy wiele sekwencji operacji jest uruchamianych w nakładających się okresach czasu. W dziedzinie programowania współbieżność to dość złożony temat. Radzenie sobie z konstrukcjami, takimi jak wątki i blokady, oraz unikanie problemów, takich jak wyścigi i zakleszczenia, może być dość kłopotliwe, co utrudnia pisanie programów współbieżnych. Dzięki współbieżności programy można projektować jako niezależne procesy współpracujące ze sobą w określonej kompozycji. Taka struktura może, ale nie musi być równoległa; jednak osiągnięcie takiej struktury w programie ma wiele zalet.
W tym artykule przyjrzymy się wielu różnym modelom współbieżności, jak je osiągnąć w różnych językach programowania zaprojektowanych dla współbieżności.
Wspólny model stanu zmienności
Spójrzmy na prosty przykład z licznikiem i dwoma wątkami, które go zwiększają. Program nie powinien być zbyt skomplikowany. Mamy obiekt, który zawiera licznik, który zwiększa się wraz ze wzrostem metody i pobiera go za pomocą metody get oraz dwa wątki, które go zwiększają.
// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }Ten naiwny program nie jest tak naiwny, jak się wydaje na pierwszy rzut oka. Kiedy uruchamiam ten program więcej razy, uzyskuję różne wyniki. Po trzech wykonaniach na moim laptopie są trzy wartości.
java Counting 553706 java Counting 547818 java Counting 613014Jaki jest powód tego nieprzewidywalnego zachowania? Program zwiększa licznik w jednym miejscu, w metodzie zwiększania wykorzystującej komendę counter++. Jeśli spojrzymy na kod bajtowy polecenia, zobaczymy, że składa się on z kilku części:
- Odczytaj wartość licznika z pamięci
- Zwiększ wartość lokalnie
- Zapisz wartość licznika w pamięci
Teraz możemy sobie wyobrazić, co może pójść nie tak w tej kolejności. Jeśli mamy dwa wątki, które niezależnie zwiększają licznik, możemy mieć taki scenariusz:
- Wartość licznika to 115
- Pierwszy wątek odczytuje wartość licznika z pamięci (115)
- Pierwszy wątek zwiększa lokalną wartość licznika (116)
- Drugi wątek odczytuje wartość licznika z pamięci (115)
- Drugi wątek zwiększa lokalną wartość licznika (116)
- Drugi wątek zapisuje do pamięci lokalną wartość licznika (116)
- Pierwszy wątek zapisuje do pamięci wartość lokalnego licznika (116)
- Wartość licznika to 116
W tym scenariuszu dwa wątki są splecione, tak aby wartość licznika została zwiększona o 1, ale wartość licznika powinna zostać zwiększona o 2, ponieważ każdy wątek zwiększa ją o 1. Różne przeplatające się wątki wpływają na wynik programu. Powodem nieprzewidywalności programu jest to, że program nie kontroluje przeplatającego się wątku, ale system operacyjny. Za każdym razem, gdy program jest wykonywany, wątki mogą przeplatać się w inny sposób. W ten sposób wprowadziliśmy do programu przypadkową nieprzewidywalność (niedeterminizm).
Aby naprawić tę przypadkową nieprzewidywalność (niedeterminizm), program musi mieć kontrolę nad przeplataniem się wątków. Gdy jeden wątek jest w metodzie zwiększania, inny wątek nie może być w tej samej metodzie, dopóki pierwszy z niego nie wyjdzie. W ten sposób serializujemy dostęp do wzrostu metody.
// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }Innym rozwiązaniem jest użycie licznika, który może zwiększać się atomowo, co oznacza, że operacji nie można rozdzielić na wiele operacji. W ten sposób nie musimy mieć bloków kodu, które wymagają synchronizacji. Java ma atomowe typy danych w przestrzeni nazw java.util.concurrent.atomic, a my użyjemy AtomicInteger.
// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }Atomowa liczba całkowita ma operacje, których potrzebujemy, więc możemy jej użyć zamiast klasy Counter. Warto zauważyć, że wszystkie metody atomicinteger nie wykorzystują blokowania, więc nie ma możliwości wystąpienia zakleszczeń, co ułatwia projektowanie programu.
Używanie zsynchronizowanych słów kluczowych do synchronizacji krytycznych metod powinno rozwiązać wszystkie problemy, prawda? Wyobraźmy sobie, że mamy dwa konta, na których możemy wpłacać, wypłacać i przelewać na inne konto. Co się stanie, jeśli w tym samym czasie będziemy chcieli przelać pieniądze z jednego konta na drugie i odwrotnie? Spójrzmy na przykład.
// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }Kiedy uruchamiam ten program na moim laptopie, zwykle się zacina. Dlaczego to się dzieje? Jeśli przyjrzymy się uważnie, zobaczymy, że kiedy przesyłamy pieniądze, wchodzimy w metodę przelewu, która jest zsynchronizowana i blokuje dostęp do wszystkich zsynchronizowanych metod na koncie źródłowym, a następnie blokuje konto docelowe, które blokuje dostęp do wszystkich zsynchronizowanych metod na nim.
Wyobraź sobie następujący scenariusz:
- Przeniesienie połączeń z pierwszego wątku z konta Boba na konto Joego
- Przeniesienie połączeń z drugiego wątku z konta Joe na konto Boba
- Drugi wątek zmniejsza kwotę z konta Joe
- Drugi wątek idzie do wpłaty kwoty na konto Boba, ale czeka, aż pierwszy wątek zakończy transfer.
- Pierwszy wątek zmniejsza kwotę z konta Boba
- Pierwszy wątek idzie na wpłatę kwoty na konto Joe, ale czeka, aż drugi wątek zakończy transfer.
W tym scenariuszu jeden wątek czeka na zakończenie transferu innego wątku i na odwrót. Utknęli ze sobą i program nie może być kontynuowany. Nazywa się to impasem. Aby uniknąć impasu, konieczne jest zablokowanie kont w tej samej kolejności. Aby naprawić program, nadamy każdemu kontu unikalny numer, dzięki czemu będziemy mogli blokować konta w tej samej kolejności podczas przelewania pieniędzy.
// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }Ze względu na nieprzewidywalność takich błędów czasem się zdarzają, ale nie zawsze i są trudne do odtworzenia. Jeśli program zachowuje się nieprzewidywalnie, jest to zwykle spowodowane współbieżnością, która wprowadza przypadkowy niedeterminizm. Aby uniknąć przypadkowego niedeterminizmu, powinniśmy wcześniej zaprojektować program uwzględniający wszystkie sploty.
Przykład programu, który ma przypadkowy niedeterminizm.
// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }Ten program ma w sobie przypadkowy niedeterminizm. Wyświetlona zostanie ostatnia wartość wprowadzona do kontenera.
java NonDeterminism SlowWolniejsze wątki wprowadzą tę wartość później, a ta wartość zostanie wydrukowana (Slow). Ale tak nie musi być. Co się stanie, jeśli komputer jednocześnie uruchomi inny program, który wymaga dużej ilości zasobów procesora? Nie mamy gwarancji, że będzie to wolniejszy wątek, który wprowadzi wartość jako ostatni, ponieważ jest kontrolowany przez system operacyjny, a nie program. Możemy mieć sytuacje, w których program działa na jednym komputerze, a na drugim zachowuje się inaczej. Takie błędy są trudne do znalezienia i przyprawiają o ból głowy programistów. Z tych wszystkich powodów ten model współbieżności jest bardzo trudny do prawidłowego wykonania.
Funkcjonalny sposób
Równoległość
Spójrzmy na inny model, z którego korzystają języki funkcjonalne. Na przykład użyjemy Clojure, które można zinterpretować za pomocą narzędzia Leiningen. Clojure to bardzo ciekawy język z dobrą obsługą współbieżności. Poprzedni model współbieżności był ze współdzielonym stanem mutowalnym. Klasy, których używamy, mogą również mieć ukryty stan, który mutuje, o którym nie wiemy, ponieważ nie jest to widoczne w ich interfejsie API. Jak widzieliśmy, ten model może powodować przypadkowy niedeterminizm i impas, jeśli nie będziemy ostrożni. Języki funkcjonalne mają typy danych, które nie mutują, dzięki czemu można je bezpiecznie udostępniać bez ryzyka, że ulegną zmianie. Funkcje mają właściwości, a także inne typy danych. Funkcje mogą być tworzone podczas wykonywania programu i przekazywane jako parametr do innej funkcji lub zwracane w wyniku wywołania funkcji.
Podstawowe prymitywy programowania współbieżnego są obiecujące i przyszłościowe. Future wykonuje blok kodu w innym wątku i zwraca obiekt dla przyszłej wartości, która zostanie wprowadzona, gdy blok zostanie wykonany.
; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))Kiedy wykonuję ten skrypt, dane wyjściowe są następujące:
Started A Started B Waiting for futures Finished A Finished B 10W tym przykładzie mamy dwa przyszłe bloki, które są wykonywane niezależnie. Program blokuje się tylko podczas odczytywania wartości z przyszłego obiektu, który nie jest jeszcze dostępny. W naszym przypadku oczekiwanie na zsumowanie obu wyników przyszłych bloków. Zachowanie jest przewidywalne (deterministyczne) i zawsze da ten sam wynik, ponieważ nie ma wspólnego stanu mutowalnego.
Innym prymitywem używanym do współbieżności jest obietnica. Promise to pojemnik, w którym można jednorazowo umieścić wartość. Podczas czytania obietnic wątek będzie czekał, aż wartość obietnicy zostanie wypełniona.
; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)W tym przykładzie przyszłość będzie czekać na wydrukowanie wyniku, o ile obiecuje, że nie zostanie zapisana wartość. Po dwóch sekundach w obietnicy zostanie zapisana wartość 42 do wydrukowania w przyszłym wątku. Korzystanie z obietnic może prowadzić do impasu, w przeciwieństwie do przyszłości, dlatego należy zachować ostrożność podczas korzystania z obietnicy.
; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)W tym przykładzie używamy wyniku przyszłości i wyniku obietnicy. Kolejność ustawiania i odczytywania wartości jest taka, że wątek główny czeka na wartość z przyszłego wątku, a przyszły wątek czeka na wartość z wątku głównego. To zachowanie będzie przewidywalne (deterministyczne) i będzie odtwarzane za każdym razem, gdy program zostanie uruchomiony, co ułatwi znalezienie i usunięcie błędu.
Korzystanie z przyszłości pozwala programowi kontynuować ćwiczenie, dopóki nie będzie potrzebować wyniku realizacji przyszłości. Powoduje to szybsze wykonanie programu. Jeśli masz wiele procesorów z przyszłością, możesz wykonać równoległe wykonanie programu, który ma przewidywalne (deterministyczne) zachowanie (za każdym razem daje ten sam wynik). W ten sposób lepiej wykorzystujemy moc komputera.
; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))W tym przykładzie możesz zobaczyć, jak wykorzystanie przyszłości może lepiej wykorzystać prędkość komputera. Mamy dwie liczby Fibonacciego, które się sumują. Widzimy, że program oblicza wynik dwukrotnie, pierwszy raz sekwencyjnie w jednym wątku, a drugi raz równolegle w dwóch wątkach. Ponieważ mój laptop ma procesor wielordzeniowy, wykonywanie równoległe działa dwa razy szybciej niż obliczenia sekwencyjne.
Wynik wykonania tego skryptu na moim laptopie:
Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"Konkurencja
Aby obsługiwać współbieżność i nieprzewidywalność w języku programowania Clojure, musimy użyć typu danych, który jest zmienny, aby inne wątki mogły zobaczyć zmiany. Najprostszym typem danych zmiennych jest atom. Atom to pojemnik, który zawsze ma wartość, którą można zastąpić inną wartością. Wartość można zastąpić, wprowadzając nową wartość lub wywołując funkcję, która pobiera starą wartość i zwraca nową, częściej używaną. Interesujące jest to, że atom jest zaimplementowany bez blokowania i jest bezpieczny w użyciu w wątkach, co oznacza, że niemożliwe jest osiągnięcie impasu. Atom używa wewnętrznie biblioteki java.util.concurrent.AtomicReference. Przyjrzyjmy się kontrprzykładowi zaimplementowanemu za pomocą atomu.
; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)Wynik wykonania skryptu na moim laptopie:
The counter is: 1000000 Number of attempts: 1680212W tym przykładzie używamy atomu, który zawiera wartość licznika. Licznik zwiększa się wraz z (zamień! licznik inc). Funkcja swap działa w następujący sposób: 1. weź wartość licznika i zachowaj ją 2. dla tej wartości wywołuje daną funkcję, która oblicza nową wartość 3. aby zapisać nową wartość, używa operacji atomowej, która sprawdza, czy stara wartość uległa zmianie 3a. jeśli wartość nie uległa zmianie, wprowadza nową wartość 3b. jeśli w międzyczasie wartość zostanie zmieniona, przejdź do kroku 1. Widzimy, że funkcję można wywołać ponownie, jeśli w międzyczasie zmieni się wartość. Wartość można zmienić tylko z innego wątku. Dlatego ważne jest, aby funkcja obliczająca nową wartość nie miała skutków ubocznych, więc nie ma znaczenia, czy zostanie wywołana więcej razy. Jednym z ograniczeń atomu jest to, że synchronizuje zmiany do jednej wartości.
; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)Kiedy wykonuję ten skrypt, otrzymuję:

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525W tym przykładzie możemy zobaczyć, jak zmieniamy więcej atomów. W pewnym momencie może wystąpić niespójność. Suma dwóch kont w pewnym momencie nie jest taka sama. Jeśli musimy koordynować zmiany wielu wartości, istnieją dwa rozwiązania:
- Umieść więcej wartości w jednym atomie
- Użyj referencji i pamięci transakcyjnej oprogramowania, jak zobaczymy później
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)Kiedy uruchamiam ten skrypt na moim komputerze, otrzymuję:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0W tym przykładzie koordynacja została rozwiązana tak, że przy użyciu mapy przypisujemy większą wartość. Kiedy przelewamy pieniądze z konta, zmieniamy wszystkie konta w tym czasie, aby nigdy nie zdarzyło się, że suma pieniędzy nie jest taka sama.
Kolejnym zmiennym typem danych jest agent. Agent zachowuje się jak atom tylko w tym sensie, że funkcja zmieniająca wartość jest wykonywana w innym wątku, więc zmiana staje się widoczna po pewnym czasie. Dlatego przy odczycie wartości agenta konieczne jest wywołanie funkcji, która czeka na wykonanie wszystkich funkcji zmieniających wartość agenta. W przeciwieństwie do funkcji atomów, która zmienia wartość, wartość jest wywoływana tylko raz i dlatego może mieć skutki uboczne. Ten typ może również synchronizować jedną wartość i nie może zakleszczać się.
; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)Kiedy uruchamiam ten skrypt na moim laptopie, otrzymuję:
The counter is: 1000000 Number of attempts: 1000000Ten przykład jest taki sam, jak implementacja licznika z atomem. Jedyną różnicą jest to, że czekamy na zakończenie wszystkich zmian agenta przed odczytaniem końcowej wartości za pomocą await.
Ostatnim typem danych zmiennej są referencje. W przeciwieństwie do atomów, odwołania mogą synchronizować zmiany wielu wartości. Każda operacja na odwołaniu powinna znajdować się w transakcji wykorzystującej dosync. Ten sposób zmiany danych nazywa się programową pamięcią transakcyjną lub w skrócie STM. Spójrzmy na przykład z przelewem na kontach.
; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)Kiedy uruchamiam ten skrypt, otrzymuję:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000Co ciekawe, prób było więcej niż liczba dokonanych transakcji. Dzieje się tak, ponieważ STM nie używa blokad, więc w przypadku konfliktu (np. dwóch wątków próbujących zmienić tę samą wartość) transakcja zostanie ponownie wykonana. Z tego powodu transakcja nie powinna mieć skutków ubocznych. Widzimy, że agent, którego wartość zmienia się w ramach transakcji, zachowuje się przewidywalnie. Funkcja zmieniająca wartość agenta będzie oceniana tyle razy, ile jest transakcji. Powodem jest to, że agent jest świadomy transakcji. Jeśli transakcja musi mieć skutki uboczne, należy je wprowadzić do działania w ramach agenta. W ten sposób program będzie miał przewidywalne zachowanie. Prawdopodobnie myślisz, że zawsze powinieneś używać STM, ale doświadczeni programiści często będą używać atomów, ponieważ atomy są prostsze i szybsze niż STM. Oczywiście, jeśli można w ten sposób stworzyć program. Jeśli masz skutki uboczne, nie ma innego wyjścia, jak użyć STM i agentów.
Aktor Model
Poniższy model współbieżności jest modelem aktora. Zasada działania tego modelu jest zbliżona do rzeczywistego świata. Jeśli zawrzemy umowę, aby stworzyć coś z wieloma osobami, na przykład budynek, to każdy człowiek na budowie ma swoją rolę. Nad tłumem czuwa przełożony. Jeśli pracownik zostanie ranny w pracy, przełożony przydzieli pracę poszkodowanego innym, którzy są dostępni. W razie potrzeby może zaprowadzić na miejsce nowego człowieka. Na stronie mamy więcej osób, które wykonują pracę jednocześnie (równocześnie), ale też rozmawiają ze sobą w celu synchronizacji. Gdybyśmy włożyli do programu pracę na budowie, to każda osoba byłaby aktorem, który ma stan i wykonuje się w swoim własnym procesie, a mówienie zastąpione zostałoby komunikatami. Popularnym językiem programowania opartym na tym modelu jest Erlang. Ten interesujący język ma niezmienne typy danych i funkcje, które mają takie same właściwości jak inne typy danych. Funkcje mogą być tworzone podczas wykonywania programu i przekazywane jako argumenty do innej funkcji lub zwracane w wyniku wywołania funkcji. Podam przykłady w języku Elixir, który wykorzystuje maszynę wirtualną Erlang, więc będę miał ten sam model programowania co Erlang tylko inną składnię. Trzy najważniejsze prymitywy w Elixirze to spawn, send and receive. spawn wykonuje funkcję w nowym procesie, send wysyła wiadomość do procesu i odbiera odbiera wiadomości, które są wysyłane do bieżącego procesu.
Pierwszy przykład z modelem aktora zostanie jednocześnie zwiększony. Aby stworzyć program z tym modelem, konieczne jest, aby aktor miał wartość licznika i odebrał komunikat, aby ustawić i pobrać wartość licznika oraz mieć dwóch aktorów, którzy będą jednocześnie zwiększać wartość licznika.
# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" endKiedy wykonuję ten przykład, otrzymuję:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827Widzimy, że ostatecznie licznik wynosi 516827, a nie 1000000, jak oczekiwaliśmy. Kiedy następnym razem uruchomiłem skrypt, otrzymałem 511010. Powodem tego zachowania jest to, że licznik otrzymuje dwie wiadomości: pobierz bieżącą wartość i ustaw nową. Aby zwiększyć licznik, program musi uzyskać aktualną wartość, zwiększyć ją o 1 i ustawić podwyższoną wartość. Dwa procesy odczytują i zapisują wartość licznika w tym samym czasie za pomocą wiadomości wysyłanej do procesu licznika. Kolejność wiadomości, które otrzyma licznik, jest nieprzewidywalna i program nie może jej kontrolować. Możemy sobie wyobrazić taki scenariusz:
- Wartość licznika to 115
- Proces A odczytuje wartość licznika (115)
- Proces B odczytuje wartość licznika (115)
- Proces B lokalnie zwiększa wartość (116)
- Proces B ustawia podwyższoną wartość licznika (116)
- Proces A zwiększa wartość licznika (116)
- Proces A ustawia podwyższoną wartość licznika (116)
- Wartość licznika to 116
Jeśli spojrzymy na scenariusz, dwa procesy zwiększają licznik o 1, a licznik zwiększa się w końcu o 1, a nie o 2. Takie splatania mogą się zdarzyć nieprzewidywalną liczbę razy, a zatem wartość licznika jest nieprzewidywalna. Aby zapobiec takiemu zachowaniu, operacja zwiększania musi być wykonana przez jeden komunikat.
# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" endUruchamiając ten skrypt otrzymuję:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000Widzimy, że licznik ma prawidłową wartość. Powodem przewidywalnego (deterministycznego) zachowania jest to, że wartość licznika wzrasta o jeden komunikat, tak że sekwencja komunikatów zwiększająca licznik nie wpłynie na jego końcową wartość. Pracując z modelem aktora, musimy zwracać uwagę na to, jak komunikaty mogą się przeplatać i starannie projektować komunikaty i działania na komunikatach, aby uniknąć przypadkowej nieprzewidywalności (niedeterminizmu).
Jak za pomocą tego modelu możemy przelać pieniądze między dwoma kontami?
# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" endKiedy uruchamiam ten skrypt, otrzymuję:
Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0Widzimy, że transfer pieniędzy działa bez niespójności, ponieważ wybraliśmy transfer wiadomości do przelewu pieniędzy i kwoty wiadomości, aby uzyskać wartość rachunków, co daje przewidywalne zachowanie programu. Za każdym razem, gdy wykonujemy przelew pieniędzy, łączna kwota pieniędzy w dowolnym momencie powinna być taka sama.
Model aktora może powodować blokadę, a tym samym zakleszczenie, dlatego należy zachować ostrożność podczas projektowania programu. Poniższy skrypt pokazuje, jak można symulować scenariusz blokady i zakleszczenia.
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil EndWhen I run this script on my laptop I get:
Locking A, B started Locking B, A started Waiting for locking to be doneFrom the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.
# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil EndWhen I run this script on my laptop I get:
Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finishedAnd now, there is no longer a deadlock.
Zakończyć
As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.
Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.
The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.
Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.
I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.
