Introducere în programarea concomitentă: un ghid pentru începători

Publicat: 2022-03-11

Ce este programarea simultană? Descris simplu, este atunci când faci mai multe lucruri în același timp. A nu se confunda cu paralelismul, concurența este atunci când mai multe secvențe de operații sunt executate în perioade de timp suprapuse. În domeniul programării, concurența este un subiect destul de complex. Abordarea constructelor precum firele de execuție și blocarea și evitarea problemelor precum condițiile de cursă și blocajele pot fi destul de greoaie, făcând programele concurente dificil de scris. Prin concurență, programele pot fi proiectate ca procese independente care lucrează împreună într-o compoziție specifică. O astfel de structură poate fi paralelă sau nu; cu toate acestea, realizarea unei astfel de structuri în programul dumneavoastră oferă numeroase avantaje.

Introducere în programarea concomitentă

În acest articol, vom arunca o privire asupra unui număr de modele diferite de concurență, cum să le realizăm în diferite limbaje de programare concepute pentru concurență.

Model de stat mutabil partajat

Să ne uităm la un exemplu simplu cu un contor și două fire care îl măresc. Programul nu ar trebui să fie prea complicat. Avem un obiect care conține un contor care crește odată cu creșterea metodei și îl recuperează cu metoda get și două fire care îl măresc.

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

Acest program naiv nu este atât de naiv pe cât pare la prima vedere. Când rulez acest program de mai multe ori, obțin rezultate diferite. Există trei valori după trei execuții pe laptopul meu.

 java Counting 553706 java Counting 547818 java Counting 613014

Care este motivul acestui comportament imprevizibil? Programul mărește contorul într-un singur loc, în metoda de creștere care folosește comanda counter++. Dacă ne uităm la codul octet al comenzii, am vedea că acesta constă din mai multe părți:

  1. Citiți valoarea contorului din memorie
  2. Creșteți valoarea la nivel local
  3. Stocați valoarea contorului în memorie

Acum ne putem imagina ce poate merge prost în această secvență. Dacă avem două fire care măresc independent contorul, atunci am putea avea acest scenariu:

  1. Valoarea contorului este 115
  2. Primul fir citește valoarea contorului din memorie (115)
  3. Primul fir crește valoarea contorului local (116)
  4. Al doilea fir citește valoarea contorului din memorie (115)
  5. Al doilea fir crește valoarea contorului local (116)
  6. Al doilea fir salvează valoarea contorului local în memorie (116)
  7. Primul fir salvează valoarea contorului local în memorie (116)
  8. Valoarea contorului este 116

În acest scenariu, două fire sunt împletite astfel încât valoarea contorului să crească cu 1, dar valoarea contorului ar trebui să fie mărită cu 2, deoarece fiecare fir o mărește cu 1. Împășirea diferitelor fire influențează rezultatul programului. Motivul impredictibilității programului este că programul nu are control asupra împletirii firelor, ci asupra sistemului de operare. De fiecare dată când programul este executat, firele se pot întrepătrunde diferit. În acest fel am introdus în program impredictibilitatea accidentală (non-determinism).

Pentru a remedia această imprevizibilitate accidentală (non-determinism), programul trebuie să aibă controlul întrepătrunderii firului. Când un fir este în metoda de creștere, un alt fir nu trebuie să fie în aceeași metodă până când primul iese din el. În acest fel, serializăm accesul la creșterea metodei.

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

O altă soluție este utilizarea unui contor care poate crește atomic, ceea ce înseamnă că operația nu poate fi separată în mai multe operații. În acest fel, nu trebuie să avem blocuri de cod care trebuie sincronizate. Java are tipuri de date atomice în spațiul de nume java.util.concurrent.atomic și vom folosi AtomicInteger.

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Numărul întreg atomic are operațiile de care avem nevoie, așa că îl putem folosi în locul clasei Counter. Este interesant de remarcat faptul că toate metodele de atominteger nu folosesc blocarea, astfel încât nu există posibilitatea de blocaje, ceea ce facilitează proiectarea programului.

Folosirea cuvintelor cheie sincronizate pentru a sincroniza metodele critice ar trebui să rezolve toate problemele, nu? Să ne imaginăm că avem două conturi care pot depune, retrage și transfera în alt cont. Ce se întâmplă dacă în același timp dorim să transferăm bani dintr-un cont în altul și invers? Să ne uităm la un exemplu.

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Când rulez acest program pe laptop, de obicei se blochează. De ce se întâmplă asta? Dacă ne uităm atent, putem vedea că atunci când transferăm bani intrăm în metoda de transfer care este sincronizată și blochează accesul la toate metodele sincronizate din contul sursă, apoi blochează contul de destinație care blochează accesul la toate metodele sincronizate de pe acesta.

Imaginează-ți următorul scenariu:

  1. Primul thread solicită transferul din contul lui Bob în contul lui Joe
  2. Al doilea thread solicită transferul din contul lui Joe în contul lui Bob
  3. Al doilea thread scade suma din contul lui Joe
  4. Al doilea thread merge la depunerea sumei în contul lui Bob, dar așteaptă ca primul fir să finalizeze transferul.
  5. Primul fir scade suma din contul lui Bob
  6. Primul fir merge pentru a depune suma în contul lui Joe, dar așteaptă ca al doilea fir să finalizeze transferul.

În acest scenariu, un fir așteaptă ca un alt fir să termine transferul și invers. Sunt blocați unul cu celălalt și programul nu poate continua. Acest lucru se numește blocaj. Pentru a evita blocajul, este necesar să blocați conturile în aceeași ordine. Pentru a repara programul, vom acorda fiecărui cont un număr unic, astfel încât să putem bloca conturile în aceeași ordine la transferul banilor.

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Din cauza impredictibilității unor astfel de greșeli, acestea se întâmplă uneori, dar nu întotdeauna și sunt greu de reprodus. Dacă programul se comportă imprevizibil, este de obicei cauzat de concurență care introduce non-determinism accidental. Pentru a evita non-determinismul accidental, ar trebui în prealabil un program de proiectare pentru a ține cont de toate împletirile.

Un exemplu de program care are un non-determinism accidental.

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

Acest program are non-determinism accidental în el. Va fi afișată ultima valoare introdusă în container.

 java NonDeterminism Slow

Firele mai lente vor introduce valoarea mai târziu, iar această valoare va fi tipărită (Slow). Dar acest lucru nu trebuie să fie cazul. Ce se întâmplă dacă computerul execută simultan un alt program care are nevoie de multe resurse CPU? Nu avem nicio garanție că va fi firul mai lent care va intra în valoare ultimul, deoarece este controlat de sistemul de operare, nu de program. Putem avea situații în care programul funcționează pe un computer și pe celălalt se comportă diferit. Astfel de erori sunt greu de găsit și provoacă dureri de cap pentru dezvoltatori. Din toate aceste motive, acest model de concurență este foarte greu de realizat corect.

Calea funcțională

Paralelism

Să ne uităm la un alt model pe care îl folosesc limbajele funcționale. De exemplu, vom folosi Clojure, care poate fi interpretat folosind instrumentul Leiningen. Clojure este un limbaj foarte interesant, cu suport bun pentru concurență. Modelul de concurență anterior a fost cu stare mutabilă partajată. Clasele pe care le folosim pot avea, de asemenea, o stare ascunsă care modifică despre care nu știm, deoarece nu este evidentă din API-ul lor. După cum am văzut, acest model poate provoca non-determinism accidental și blocaje dacă nu suntem atenți. Limbile funcționale au tipuri de date care nu se modifică, astfel încât acestea pot fi partajate în siguranță, fără riscul ca acestea să se schimbe. Funcțiile au proprietăți, precum și alte tipuri de date. Funcțiile pot fi create în timpul execuției programului și transmise ca parametru unei alte funcții sau returnate ca rezultat al apelului funcției.

Primitivele de bază pentru programarea concomitentă sunt viitoare și promițătoare. Future execută un bloc de cod într-un alt fir și returnează un obiect pentru valoarea viitoare care va fi introdusă atunci când blocul este executat.

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

Când execut acest script, rezultatul este:

 Started A Started B Waiting for futures Finished A Finished B 10

În acest exemplu avem două blocuri viitoare care sunt executate independent. Programează numai blocuri atunci când citești valoarea de la obiectul viitor care nu este încă disponibil. În cazul nostru, așteptăm ca ambele rezultate ale blocurilor viitoare să fie însumate. Comportamentul este previzibil (determinist) și va da întotdeauna același rezultat, deoarece nu există o stare mutabilă comună.

O altă primitivă care este folosită pentru concurență este o promisiune. Promisiunea este un container în care se poate pune o valoare o dată. Când citiți promisiuni, firul va aștepta până când valoarea promisiunii este completată.

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

În acest exemplu, viitorul va aștepta să imprime rezultatul atâta timp cât promisiunea nu va fi salvată. După două secunde, în promisiunea va fi stocată valoarea 42 pentru a fi tipărită în viitorul thread. Folosirea promisiunilor poate duce la blocaj, spre deosebire de viitor, așa că aveți grijă când utilizați promisiunea.

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

În acest exemplu, folosim rezultatul viitorului și rezultatul promisiunii. Ordinea de setare și citire a valorilor este aceea că firul principal așteaptă o valoare din firul viitor și firul viitor așteaptă o valoare din firul principal. Acest comportament va fi previzibil (determinist) și va fi redat de fiecare dată când programul se execută, ceea ce face mai ușor să găsiți și să eliminați erorile.

Utilizarea viitorului permite programului să continue cu exercițiul până când are nevoie de rezultatul execuției viitorului. Acest lucru are ca rezultat o execuție mai rapidă a programului. Dacă aveți mai multe procesoare cu viitor, puteți face execuția paralelă a programelor care au un comportament previzibil (determinist) (de fiecare dată dă același rezultat). Astfel vom exploata mai bine puterea computerului.

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

În acest exemplu, puteți vedea cum utilizarea viitorului poate folosi mai bine viteza unui computer. Avem două numere Fibonacci care se adună. Putem vedea că programul calculează rezultatul de două ori, prima dată secvenţial într-un singur fir, iar a doua oară în paralel în două fire. Deoarece laptopul meu are un procesor multicore, execuția paralelă funcționează de două ori mai rapid decât calculul secvenţial.

Rezultatul executării acestui script pe laptopul meu:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

Concurență

Pentru a sprijini concurența și imprevizibilitatea în limbajul de programare Clojure, trebuie să folosim un tip de date care este variabil, astfel încât alte fire să poată vedea modificările. Cel mai simplu tip de date variabile este atom. Atom este un container care are întotdeauna valoarea care poate fi înlocuită cu o altă valoare. Valoarea poate fi înlocuită prin introducerea unei noi valori sau prin apelarea unei funcții care preia valoarea veche și returnează o nouă valoare care este folosită mai frecvent. Este interesant că atomul este implementat fără blocare și este sigur de utilizat în fire, ceea ce înseamnă că este imposibil să ajungi la blocaj. Intern, atom folosește biblioteca java.util.concurrent.AtomicReference. Să ne uităm la un contraexemplu implementat cu atom.

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Rezultatul execuției scriptului pe laptopul meu:

 The counter is: 1000000 Number of attempts: 1680212

În acest exemplu folosim un atom care conține valoarea contorului. Contorul crește cu (swap! counter inc). Funcția Swap funcționează astfel: 1. luați valoarea contorului și păstrați-o 2. pentru această valoare apelează funcția dată care calculează noua valoare 3. pentru a salva o nouă valoare, folosește operația atomică care verifică dacă vechea valoare sa schimbat 3a. dacă valoarea nu s-a schimbat se introduce o nouă valoare 3b. dacă între timp se modifică valoarea, atunci treceți la pasul 1. Vedem că funcția poate fi apelată din nou dacă valoarea este schimbată între timp. Valoarea poate fi schimbată doar dintr-un alt fir. Prin urmare, este esențial ca funcția care calculează o nouă valoare să nu aibă efecte secundare, astfel încât să nu conteze dacă este apelată de mai multe ori. O limitare a atomului este că sincronizează modificările la o singură valoare.

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

Când execut acest script primesc:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

În acest exemplu putem vedea cum schimbăm mai mulți atomi. La un moment dat, poate apărea o inconsecvență. Suma a două conturi la un moment dat nu este aceeași. Dacă trebuie să coordonăm modificări ale mai multor valori, există două soluții:

  1. Plasați mai multe valori într-un atom
  2. Utilizați referințe și memoria tranzacțională software, așa cum vom vedea mai târziu
 ; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

Când rulez acest script pe computer, primesc:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

În exemplu, coordonarea a fost rezolvată astfel încât să punem mai multă valoare folosind o hartă. Când transferăm bani din cont, schimbăm toate conturile la momentul respectiv, astfel încât să nu se întâmple niciodată ca suma de bani să nu fie aceeași.

Următorul tip de date variabile este agent. Agentul se comportă ca un atom doar prin aceea că funcția care modifică valoarea este executată într-un fir diferit, astfel încât este nevoie de ceva timp pentru ca schimbarea să devină vizibilă. Prin urmare, atunci când citiți valoarea agentului este necesar să apelați o funcție care așteaptă până când toate funcțiile care modifică valoarea agentului sunt executate. Spre deosebire de funcția atomilor care modifică valoarea este numită o singură dată și, prin urmare, poate avea efecte secundare. Acest tip poate, de asemenea, sincroniza o valoare și nu poate bloca.

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Când rulez acest script pe laptop, primesc:

 The counter is: 1000000 Number of attempts: 1000000

Acest exemplu este același cu implementarea contorului cu atomul. Singura diferență este că aici așteptăm finalizarea tuturor modificărilor agentului înainte de a citi valoarea finală folosind await.

Ultimul tip de date variabile sunt referințele. Spre deosebire de atomi, referințele pot sincroniza modificări la mai multe valori. Fiecare operațiune de referință ar trebui să fie într-o tranzacție folosind dosync. Acest mod de modificare a datelor se numește memorie tranzacțională software sau abreviat STM. Să ne uităm la un exemplu cu transferul de bani în conturi.

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

Când rulez acest script, primesc:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

Interesant este că au fost mai multe încercări decât numărul de tranzacții efectuate. Acest lucru se datorează faptului că STM-ul nu folosește blocări, așa că dacă există un conflict, (cum ar fi două fire care încearcă să schimbe aceeași valoare) tranzacția va fi re-execută. Din acest motiv, tranzacția nu ar trebui să aibă efecte secundare. Putem vedea că agentul a cărui valoare se modifică în cadrul tranzacției se comportă previzibil. O funcție care modifică valoarea agentului va fi evaluată de câte ori există tranzacții. Motivul este că agentul este conștient de tranzacție. Dacă tranzacția trebuie să aibă efecte secundare, acestea ar trebui puse în funcțiune în cadrul agentului. În acest fel, programul va avea un comportament previzibil. Probabil crezi că ar trebui să folosești întotdeauna STM, dar programatorii experimentați vor folosi adesea atomi, deoarece atomii sunt mai simpli și mai rapidi decât STM. Desigur, asta dacă este posibil să faci un program în acest fel. Dacă aveți reacții adverse, atunci nu există altă opțiune decât să utilizați STM și agenți.

Model de actor

Următorul model de concurență este un model actor. Principiul acestui model este similar cu lumea reală. Dacă facem o înțelegere pentru a crea ceva cu mulți oameni, de exemplu o clădire, atunci fiecare om de la șantier are rolul lui. O mulțime de oameni este supravegheată de supraveghetor. Dacă un lucrător este accidentat la locul de muncă, supraveghetorul va atribui locul de muncă al bărbatului accidentat celorlalți care sunt disponibili. Dacă este necesar, el poate conduce la site un om nou. Pe site avem mai mulți oameni care fac munca simultan (concurente), dar și vorbesc între ei pentru a se sincroniza. Dacă am pune munca pe șantier în program, atunci fiecare persoană ar fi un actor care are un stat și execută în propriul proces, iar vorbirea ar fi înlocuită cu mesaje. Limbajul de programare popular bazat pe acest model este Erlang. Acest limbaj interesant are tipuri de date imuabile și funcții care au aceleași proprietăți ca și alte tipuri de date. Funcțiile pot fi create în timpul execuției programului și transmise ca argumente unei alte funcții sau returnate ca rezultat al apelului funcției. Voi da exemple în limbajul Elixir care utilizează mașina virtuală Erlang, așa că voi avea același model de programare ca și Erlang, doar o sintaxă diferită. Cele mai importante trei primitive din Elixir sunt apariția, trimiterea și primirea. spawn execută funcția în noul proces, trimite trimite mesajul către proces și primește primește mesaje care sunt trimise procesului curent.

Primul exemplu cu modelul actorului va fi concomitent. Pentru a realiza un program cu acest model, este necesar ca un actor să aibă valoarea contorului și să primească mesaj pentru a seta și a prelua valoarea contorului și să aibă doi actori care vor crește simultan valoarea contorului.

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Când execut acest exemplu, primesc:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

Putem vedea că până la urmă contorul este 516827 și nu 1000000 așa cum ne așteptam. Când am rulat scriptul data viitoare, am primit 511010. Motivul acestui comportament este că contorul primește două mesaje: preluați valoarea curentă și setați noua valoare. Pentru a crește contorul, programul trebuie să obțină valoarea curentă, să o crească cu 1 și să seteze valoarea crescută. Două procese citesc și scriu valoarea contorului în același timp, folosind mesajele care sunt trimise procesului contor. Ordinea mesajelor pe care contorul le va primi este imprevizibilă, iar programul nu o poate controla. Ne putem imagina acest scenariu:

  1. Valoarea contorului este 115
  2. Procesul A citește valoarea contorului (115)
  3. Procesul B citește valoarea contorului (115)
  4. Procesul B crește valoarea local (116)
  5. Procesul B setează o valoare crescută la contor (116)
  6. Procesul A crește valoarea contorului (116)
  7. Procesul A setează o valoare crescută la contor (116)
  8. Valoarea contorului este 116

Dacă ne uităm la scenariu, două procese măresc contorul cu 1, iar contorul crește în final cu 1 și nu cu 2. Astfel de împletiri se pot întâmpla de un număr imprevizibil de ori și, prin urmare, valoarea contorului este imprevizibilă. Pentru a preveni acest comportament, operația de creștere trebuie făcută printr-un singur mesaj.

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Rulând acest script primesc:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

Putem vedea că contorul are valoarea corectă. Motivul comportamentului predictibil (determinist) este că valoarea contorului crește cu un mesaj, astfel încât succesiunea de mesaje pentru creșterea contorului nu va afecta valoarea finală. Lucrând cu modelul de actor, trebuie să acordăm atenție modului în care mesajele se pot întrepătrunde și proiectarea atentă a mesajelor și a acțiunilor asupra mesajelor pentru a evita imprevizibilitatea accidentală (non-determinism).

Cum putem transfera bani între două conturi cu acest model?

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

Când rulez acest script primesc:

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

Putem vedea că transferul de bani funcționează fără inconsecvențe, deoarece am ales transferul de mesaje pentru a transfera bani și sume de mesaje pentru a obține valoarea conturilor ceea ce ne oferă un comportament previzibil al programului. Ori de câte ori facem un transfer de bani, suma totală de bani în orice moment ar trebui să fie aceeași.

Modelul de actor poate provoca blocare și, prin urmare, blocaj, așa că aveți grijă când proiectați programul. Următorul script arată cum puteți simula scenariul de blocare și blocare.

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

Învelire

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial