Introduzione alla programmazione simultanea: una guida per principianti

Pubblicato: 2022-03-11

Che cos'è la programmazione simultanea? Descritto semplicemente, è quando stai facendo più di una cosa allo stesso tempo. Da non confondere con il parallelismo, la concorrenza è quando più sequenze di operazioni vengono eseguite in periodi di tempo sovrapposti. Nel regno della programmazione, la concorrenza è un argomento piuttosto complesso. Gestire costrutti come thread e lock ed evitare problemi come race condition e deadlock può essere piuttosto ingombrante, rendendo difficile la scrittura di programmi simultanei. Attraverso la concorrenza, i programmi possono essere progettati come processi indipendenti che lavorano insieme in una composizione specifica. Tale struttura può o non può essere resa parallela; tuttavia, ottenere una tale struttura nel tuo programma offre numerosi vantaggi.

Introduzione alla programmazione simultanea

In questo articolo, daremo un'occhiata a diversi modelli di concorrenza, come ottenerli in vari linguaggi di programmazione progettati per la concorrenza.

Modello a stato mutevole condiviso

Diamo un'occhiata a un semplice esempio con un contatore e due fili che lo aumentano. Il programma non dovrebbe essere troppo complicato. Abbiamo un oggetto che contiene un contatore che aumenta con il metodo aumento e lo recupera con il metodo get e due thread che lo aumentano.

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

Questo programma ingenuo non è così ingenuo come sembra a prima vista. Quando eseguo questo programma più volte, ottengo risultati diversi. Ci sono tre valori dopo tre esecuzioni sul mio laptop.

 java Counting 553706 java Counting 547818 java Counting 613014

Qual è il motivo di questo comportamento imprevedibile? Il programma aumenta il contatore in un punto, nel metodo aumento che utilizza il comando counter++. Se osserviamo il codice del byte di comando vedremmo che è composto da più parti:

  1. Legge il valore del contatore dalla memoria
  2. Aumenta il valore a livello locale
  3. Memorizza il valore del contatore in memoria

Ora possiamo immaginare cosa può andare storto in questa sequenza. Se abbiamo due thread che aumentano indipendentemente il contatore, potremmo avere questo scenario:

  1. Il valore del contatore è 115
  2. Il primo thread legge il valore del contatore dalla memoria (115)
  3. Il primo thread aumenta il valore del contatore locale (116)
  4. Il secondo thread legge il valore del contatore dalla memoria (115)
  5. Il secondo thread aumenta il valore del contatore locale (116)
  6. Il secondo thread salva il valore del contatore locale nella memoria (116)
  7. Il primo thread salva il valore del contatore locale nella memoria (116)
  8. Il valore del contatore è 116

In questo scenario, due thread vengono intrecciati in modo che il valore del contatore venga aumentato di 1, ma il valore del contatore dovrebbe essere aumentato di 2 poiché ogni thread lo aumenta di 1. L'intreccio di thread diversi influenza il risultato del programma. Il motivo dell'imprevedibilità del programma è che il programma non ha il controllo dell'intreccio dei thread ma del sistema operativo. Ogni volta che il programma viene eseguito, i thread possono intrecciarsi in modo diverso. In questo modo abbiamo introdotto nel programma l'imprevedibilità accidentale (non determinismo).

Per correggere questa imprevedibilità accidentale (non determinismo), il programma deve avere il controllo dell'intreccio dei fili. Quando un thread è nel metodo aumenta un altro thread non deve essere nello stesso metodo finché il primo non ne esce. In questo modo serializziamo l'accesso al metodo aumento.

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Un'altra soluzione è utilizzare un contatore che può aumentare atomicamente, il che significa che l'operazione non può essere separata in più operazioni. In questo modo, non abbiamo bisogno di blocchi di codice da sincronizzare. Java ha tipi di dati atomici nello spazio dei nomi java.util.concurrent.atomic e useremo AtomicInteger.

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Atomic intero ha le operazioni di cui abbiamo bisogno, quindi possiamo usarlo al posto della classe Counter. È interessante notare che tutti i metodi di atomicinteger non utilizzano il lock, quindi non c'è possibilità di deadlock, il che facilita la progettazione del programma.

L'utilizzo di parole chiave sincronizzate per sincronizzare i metodi critici dovrebbe risolvere tutti i problemi, giusto? Immaginiamo di avere due conti che possono depositare, prelevare e trasferire su un altro conto. Cosa succede se allo stesso tempo vogliamo trasferire denaro da un conto all'altro e viceversa? Diamo un'occhiata a un esempio.

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Quando eseguo questo programma sul mio laptop di solito si blocca. Perché questo accade? Se osserviamo da vicino, possiamo vedere che quando trasferiamo denaro entriamo nel metodo di trasferimento che è sincronizzato e blocca l'accesso a tutti i metodi sincronizzati sull'account di origine, quindi blocca l'account di destinazione che blocca l'accesso a tutti i metodi sincronizzati su di esso.

Immagina il seguente scenario:

  1. Le chiamate del primo thread vengono trasferite dall'account di Bob all'account di Joe
  2. Le chiamate del secondo thread vengono trasferite dall'account di Joe all'account di Bob
  3. Il secondo thread riduce l'importo dall'account di Joe
  4. Il secondo thread va a depositare l'importo sull'account di Bob ma attende che il primo thread completi il ​​trasferimento.
  5. Il primo thread riduce l'importo dall'account di Bob
  6. Il primo thread va a depositare l'importo sull'account di Joe ma attende che il secondo thread completi il ​​trasferimento.

In questo scenario, un thread attende che un altro thread termini il trasferimento e viceversa. Sono bloccati l'uno con l'altro e il programma non può continuare. Questo è chiamato deadlock. Per evitare deadlock è necessario bloccare gli account nello stesso ordine. Per risolvere il programma, assegneremo a ciascun conto un numero univoco in modo da poter bloccare i conti nello stesso ordine durante il trasferimento del denaro.

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

A causa dell'imprevedibilità di tali errori, a volte si verificano, ma non sempre e sono difficili da riprodurre. Se il programma si comporta in modo imprevedibile, di solito è causato dalla concorrenza che introduce il non determinismo accidentale. Per evitare un non determinismo accidentale, dovremmo progettare in anticipo un programma che tenga conto di tutti gli intrecci.

Un esempio di programma che ha un non determinismo accidentale.

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

Questo programma contiene un non determinismo accidentale. Verrà visualizzato l'ultimo valore inserito nel contenitore.

 java NonDeterminism Slow

I thread più lenti inseriranno il valore in un secondo momento e questo valore verrà stampato (lento). Ma questo non deve essere il caso. Cosa succede se il computer esegue contemporaneamente un altro programma che richiede molte risorse della CPU? Non abbiamo alcuna garanzia che sarà il thread più lento a immettere il valore per ultimo perché è controllato dal sistema operativo, non dal programma. Possiamo avere situazioni in cui il programma funziona su un computer e sull'altro si comporta in modo diverso. Tali errori sono difficili da trovare e causano mal di testa agli sviluppatori. Per tutti questi motivi questo modello di concorrenza è molto difficile da eseguire correttamente.

Modo Funzionale

Parallelismo

Diamo un'occhiata a un altro modello utilizzato dai linguaggi funzionali. Ad esempio utilizzeremo Clojure, che può essere interpretato utilizzando lo strumento Leiningen. Clojure è un linguaggio molto interessante con un buon supporto per la concorrenza. Il precedente modello di concorrenza era con stato mutevole condiviso. Le classi che usiamo possono anche avere uno stato nascosto che muta di cui non siamo a conoscenza, perché non è evidente dalla loro API. Come abbiamo visto, questo modello può causare non determinismo accidentale e deadlock se non stiamo attenti. I linguaggi funzionali hanno tipi di dati che non mutano in modo che possano essere condivisi in sicurezza senza il rischio che cambino. Le funzioni hanno proprietà e altri tipi di dati. Le funzioni possono essere create durante l'esecuzione del programma e passate come parametro a un'altra funzione o restituite come risultato della chiamata della funzione.

Le primitive di base per la programmazione simultanea sono future e promesse. Future esegue un blocco di codice in un altro thread e restituisce un oggetto per il valore futuro che verrà immesso quando il blocco verrà eseguito.

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

Quando eseguo questo script l'output è:

 Started A Started B Waiting for futures Finished A Finished B 10

In questo esempio abbiamo due blocchi futuri che vengono eseguiti indipendentemente. Il programma si blocca solo durante la lettura del valore dall'oggetto futuro che non è ancora disponibile. Nel nostro caso, in attesa di sommare entrambi i risultati dei blocchi futuri. Il comportamento è prevedibile (deterministico) e darà sempre lo stesso risultato perché non esiste uno stato mutevole condiviso.

Un'altra primitiva usata per la concorrenza è una promessa. Promise è un contenitore in cui si può mettere un valore una volta. Durante la lettura delle promesse, il thread attende fino a quando il valore della promessa viene riempito.

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

In questo esempio, il futuro attenderà di stampare il risultato fintanto che il valore della promessa di non essere salvato. Dopo due secondi, nella promessa verrà memorizzato il valore 42 da stampare nel thread futuro. L'uso delle promesse può portare a uno stallo rispetto al futuro, quindi fai attenzione quando usi la promessa.

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

In questo esempio, stiamo usando il risultato del futuro e il risultato della promessa. L'ordine di impostazione e lettura dei valori è che il thread principale attende un valore dal thread futuro e il thread futuro attende un valore dal thread principale. Questo comportamento sarà prevedibile (deterministico) e verrà riprodotto ogni volta che viene eseguito il programma, il che rende più facile trovare e rimuovere l'errore.

L'uso del futuro consente al programma di continuare con l'esercizio fino a quando non necessita del risultato dell'esecuzione del futuro. Ciò si traduce in un'esecuzione più rapida del programma. Se hai più processori con il futuro, puoi eseguire l'esecuzione parallela di programmi che hanno un comportamento prevedibile (deterministico) (ogni volta dà lo stesso risultato). In questo modo sfruttiamo meglio la potenza del computer.

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

In questo esempio puoi vedere come l'uso di future può sfruttare meglio la velocità di un computer. Abbiamo due numeri di Fibonacci che si sommano. Possiamo vedere che il programma calcola il risultato due volte, la prima volta in sequenza in un singolo thread e la seconda volta in parallelo in due thread. Poiché il mio laptop ha un processore multicore, l'esecuzione parallela funziona due volte più velocemente del calcolo sequenziale.

Il risultato dell'esecuzione di questo script sul mio laptop:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

Concorrenza

Per supportare la concorrenza e l'imprevedibilità nel linguaggio di programmazione Clojure, dobbiamo utilizzare un tipo di dati variabile in modo che altri thread possano vedere le modifiche. Il tipo di dati variabile più semplice è atom. Atom è un contenitore che ha sempre il valore che può essere sostituito da un altro valore. Il valore può essere sostituito immettendo un nuovo valore o chiamando una funzione che prende il vecchio valore e restituisce il nuovo valore che viene utilizzato più frequentemente. È interessante notare che atom è implementato senza lock ed è sicuro da usare nei thread, il che significa che è impossibile raggiungere il deadlock. Internamente, atom utilizza la libreria java.util.concurrent.AtomicReference. Diamo un'occhiata a un controesempio implementato con atom.

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Il risultato dell'esecuzione dello script sul mio laptop:

 The counter is: 1000000 Number of attempts: 1680212

In questo esempio utilizziamo un atomo che contiene il valore del contatore. Il contatore aumenta con (scambia! contatore inc). La funzione di scambio funziona in questo modo: 1. prendi il valore del contatore e conservalo 2. per questo valore chiama la funzione data che calcola il nuovo valore 3. per salvare il nuovo valore, usa l'operazione atomica che controlla se il vecchio valore è cambiato 3a. se il valore non è cambiato entra in un nuovo valore 3b. se nel frattempo il valore viene modificato, andare al passaggio 1 Vediamo che la funzione può essere richiamata di nuovo se nel frattempo il valore viene modificato. Il valore può essere modificato solo da un altro thread. Pertanto, è essenziale che la funzione che calcola un nuovo valore non abbia effetti collaterali, quindi non importa se viene chiamata più volte. Una limitazione dell'atomo è che sincronizza le modifiche su un valore.

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

Quando eseguo questo script ottengo:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

In questo esempio possiamo vedere come cambiamo più atomi. A un certo punto, può verificarsi l'incoerenza. La somma di due conti in un dato momento non è la stessa. Se dobbiamo coordinare modifiche di più valori ci sono due soluzioni:

  1. Metti più valori in un atomo
  2. Utilizzare i riferimenti e la memoria transazionale del software, come vedremo in seguito
 ; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

Quando eseguo questo script sul mio computer ottengo:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

Nell'esempio, il coordinamento è stato risolto in modo da dare più valore utilizzando una mappa. Quando trasferiamo denaro dal conto, cambiamo tutti i conti in quel momento in modo che non accada mai che la somma di denaro non sia la stessa.

Il successivo tipo di dati variabile è l'agente. L'agente si comporta come un atomo solo in quanto la funzione che modifica il valore viene eseguita in un thread diverso, quindi è necessario del tempo prima che la modifica diventi visibile. Pertanto, durante la lettura del valore dell'agente è necessario chiamare una funzione che attende l'esecuzione di tutte le funzioni che modificano il valore dell'agente. A differenza degli atomi, la funzione che cambia il valore viene chiamata una sola volta e quindi può avere effetti collaterali. Questo tipo può anche sincronizzare un valore e non può eseguire il deadlock.

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Quando eseguo questo script sul mio laptop ottengo:

 The counter is: 1000000 Number of attempts: 1000000

Questo esempio è lo stesso dell'implementazione del contatore con l'atomo. L'unica differenza è che qui stiamo aspettando il completamento di tutte le modifiche dell'agente prima di leggere il valore finale utilizzando await.

L'ultimo tipo di dati variabili sono riferimenti. A differenza degli atomi, i riferimenti possono sincronizzare le modifiche su più valori. Ogni operazione su riferimento dovrebbe essere in una transazione che utilizza dosync. Questo modo di modificare i dati è chiamato memoria transazionale software o STM abbreviato. Diamo un'occhiata a un esempio con il trasferimento di denaro nei conti.

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

Quando eseguo questo script, ottengo:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

È interessante notare che ci sono stati più tentativi rispetto al numero di transazioni effettuate. Questo perché l'STM non utilizza i blocchi, quindi se c'è un conflitto (come due thread che cercano di modificare lo stesso valore) la transazione verrà rieseguita. Per questo motivo, l'operazione non dovrebbe avere effetti collaterali. Possiamo vedere che l'agente il cui valore cambia all'interno della transazione si comporta in modo prevedibile. Una funzione che modifica il valore dell'agente verrà valutata tante volte quante sono le transazioni. Il motivo è che l'agente è a conoscenza della transazione. Se la transazione deve avere effetti collaterali, questi dovrebbero essere messi in funzione all'interno dell'agente. In questo modo, il programma avrà un comportamento prevedibile. Probabilmente pensi che dovresti sempre usare STM, ma i programmatori esperti useranno spesso gli atomi perché gli atomi sono più semplici e veloci di STM. Naturalmente, questo è se è possibile creare un programma in quel modo. Se hai effetti collaterali, non c'è altra scelta che usare STM e agenti.

Modello attore

Il seguente modello di concorrenza è un modello attore. Il principio di questo modello è simile al mondo reale. Se facciamo un accordo per creare qualcosa con molte persone, ad esempio un edificio, allora ogni uomo in cantiere ha il proprio ruolo. Una folla di persone è sorvegliata dal supervisore. Se un lavoratore è infortunato sul lavoro, il supervisore assegnerà l'incarico dell'infortunato agli altri che sono disponibili. Se necessario può condurre al sito un uomo nuovo. Sul sito abbiamo più persone che fanno il lavoro contemporaneamente (contemporaneamente), ma parlano anche tra loro per sincronizzarsi. Se inserissimo nel programma il lavoro sul cantiere, allora ogni persona sarebbe un attore che ha uno stato ed esegue il proprio processo, e il discorso verrebbe sostituito da messaggi. Il popolare linguaggio di programmazione basato su questo modello è Erlang. Questo interessante linguaggio ha tipi di dati immutabili e funzioni che hanno le stesse proprietà di altri tipi di dati. Le funzioni possono essere create durante l'esecuzione del programma e passate come argomenti a un'altra funzione o restituite come risultato della chiamata di funzione. Darò esempi nel linguaggio Elixir che utilizza la macchina virtuale Erlang, quindi avrò lo stesso modello di programmazione di Erlang solo una sintassi diversa. Le tre primitive più importanti in Elixir sono spawn, send e ricevi. spawn esegue la funzione nel nuovo processo, send invia il messaggio al processo e riceve riceve i messaggi inviati al processo corrente.

Il primo esempio con il modello attore verrà incrementato contemporaneamente. Per fare un programma con questo modello, è necessario che un attore abbia il valore del contatore e riceva un messaggio per impostare e recuperare il valore del contatore, e avere due attori che aumenteranno contemporaneamente il valore del contatore.

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Quando eseguo questo esempio ottengo:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

Possiamo vedere che alla fine il contatore è 516827 e non 1000000 come ci aspettavamo. Quando ho eseguito lo script la volta successiva, ho ricevuto 511010. Il motivo di questo comportamento è che il contatore riceve due messaggi: recuperare il valore corrente e impostare il nuovo valore. Per aumentare il contatore, il programma deve ottenere il valore corrente, aumentarlo di 1 e impostare il valore aumentato. Due processi leggono e scrivono contemporaneamente il valore del contatore utilizzando i messaggi inviati al processo del contatore. L'ordine dei messaggi che il contatore riceverà è imprevedibile e il programma non può controllarlo. Possiamo immaginare questo scenario:

  1. Il valore del contatore è 115
  2. Il processo A legge il valore del contatore (115)
  3. Il processo B legge il valore del contatore (115)
  4. Il processo B aumenta il valore localmente (116)
  5. Il processo B imposta il valore aumentato sul contatore (116)
  6. Il processo A aumenta il valore del contatore (116)
  7. Il processo A imposta il valore aumentato sul contatore (116)
  8. Il valore del contatore è 116

Se osserviamo lo scenario, due processi aumentano il contatore di 1 e il contatore alla fine viene aumentato di 1 e non di 2. Tali intrecci possono verificarsi un numero imprevedibile di volte e quindi il valore del contatore è imprevedibile. Per evitare questo comportamento, l'operazione di aumento deve essere eseguita da un messaggio.

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Eseguendo questo script ottengo:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

Possiamo vedere che il contatore ha il valore corretto. Il motivo del comportamento prevedibile (deterministico) è che il valore del contatore aumenta di un messaggio in modo che la sequenza di messaggi per aumentare il contatore non influisca sul suo valore finale. Lavorando con il modello attore, dobbiamo prestare attenzione a come i messaggi possono intrecciarsi e un'attenta progettazione di messaggi e azioni sui messaggi per evitare imprevedibilità accidentale (non determinismo).

Come possiamo trasferire denaro tra due conti con questo modello?

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

Quando eseguo questo script ottengo:

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

Possiamo vedere che il trasferimento di denaro funziona senza incongruenze, perché abbiamo scelto il trasferimento di messaggi per trasferire denaro e importi di messaggi per ottenere il valore dei conti che ci dà un comportamento prevedibile del programma. Ogni volta che eseguiamo un trasferimento di denaro, l'importo totale di denaro in qualsiasi momento dovrebbe essere lo stesso.

Il modello dell'attore può causare un blocco e quindi un deadlock, quindi prestare attenzione durante la progettazione del programma. Lo script seguente mostra come simulare lo scenario di blocco e deadlock.

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

Incartare

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial