Introduction à la programmation simultanée : guide du débutant
Publié: 2022-03-11Qu'est-ce que la programmation concurrente ? Simplement décrit, c'est quand vous faites plus d'une chose en même temps. À ne pas confondre avec le parallélisme, la simultanéité se produit lorsque plusieurs séquences d'opérations sont exécutées dans des périodes de temps qui se chevauchent. Dans le domaine de la programmation, la concurrence est un sujet assez complexe. Traiter des constructions telles que les threads et les verrous et éviter des problèmes tels que les conditions de concurrence et les blocages peut être assez fastidieux, ce qui rend les programmes concurrents difficiles à écrire. Grâce à la concurrence, les programmes peuvent être conçus comme des processus indépendants travaillant ensemble dans une composition spécifique. Une telle structure peut ou non être rendue parallèle ; cependant, la réalisation d'une telle structure dans votre programme offre de nombreux avantages.
Dans cet article, nous examinerons un certain nombre de modèles différents de concurrence, comment les réaliser dans divers langages de programmation conçus pour la concurrence.
Modèle d'état mutable partagé
Prenons un exemple simple avec un compteur et deux threads qui l'augmentent. Le programme ne devrait pas être trop compliqué. Nous avons un objet qui contient un compteur qui augmente avec l'augmentation de la méthode, et le récupère avec la méthode get et deux threads qui l'augmentent.
// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }
Ce programme naïf n'est pas aussi naïf qu'il y paraît à première vue. Lorsque j'exécute ce programme plusieurs fois, j'obtiens des résultats différents. Il y a trois valeurs après trois exécutions sur mon portable.
java Counting 553706 java Counting 547818 java Counting 613014
Quelle est la raison de ce comportement imprévisible ? Le programme augmente le compteur à un endroit, dans la méthode d'augmentation qui utilise la commande counter++. Si nous regardons le byte code de la commande, nous verrions qu'il se compose de plusieurs parties :
- Lire la valeur du compteur de la mémoire
- Augmenter la valeur localement
- Stocker la valeur du compteur en mémoire
Maintenant, nous pouvons imaginer ce qui peut mal tourner dans cette séquence. Si nous avons deux threads qui augmentent indépendamment le compteur, nous pourrions avoir ce scénario :
- La valeur du compteur est 115
- Le premier thread lit la valeur du compteur dans la mémoire (115)
- Le premier thread augmente la valeur du compteur local (116)
- Le deuxième thread lit la valeur du compteur dans la mémoire (115)
- Le deuxième thread augmente la valeur du compteur local (116)
- Le deuxième thread enregistre la valeur du compteur local dans la mémoire (116)
- Le premier thread enregistre la valeur du compteur local dans la mémoire (116)
- La valeur du compteur est 116
Dans ce scénario, deux threads sont entrelacés de sorte que la valeur du compteur est augmentée de 1, mais la valeur du compteur doit être augmentée de 2 car chaque thread l'augmente de 1. L'entrelacement de différents threads influence le résultat du programme. La raison de l'imprévisibilité du programme est que le programme n'a aucun contrôle sur l'entrelacement des threads mais sur le système d'exploitation. Chaque fois que le programme est exécuté, les threads peuvent s'entrelacer différemment. De cette manière, nous avons introduit une imprévisibilité accidentelle (non-déterminisme) dans le programme.
Pour corriger cette imprévisibilité accidentelle (non-déterminisme), le programme doit avoir le contrôle de l'entrelacement des threads. Lorsqu'un thread est dans la méthode, un autre thread ne doit pas être dans la même méthode tant que le premier n'en sort pas. De cette façon, nous sérialisons l'accès à la méthode d'augmentation.
// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
Une autre solution consiste à utiliser un compteur qui peut augmenter de manière atomique, ce qui signifie que l'opération ne peut pas être séparée en plusieurs opérations. De cette façon, nous n'avons pas besoin d'avoir des blocs de code qui doivent être synchronisés. Java a des types de données atomiques dans l'espace de noms java.util.concurrent.atomic, et nous utiliserons AtomicInteger.
// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
L'entier atomique a les opérations dont nous avons besoin, nous pouvons donc l'utiliser à la place de la classe Counter. Il est intéressant de noter que toutes les méthodes d'atomicinteger n'utilisent pas de verrouillage, de sorte qu'il n'y a aucune possibilité de blocages, ce qui facilite la conception du programme.
L'utilisation de mots-clés synchronisés pour synchroniser les méthodes critiques devrait résoudre tous les problèmes, n'est-ce pas ? Imaginons que nous ayons deux comptes qui peuvent déposer, retirer et transférer vers un autre compte. Que se passe-t-il si en même temps nous voulons transférer de l'argent d'un compte à un autre et vice versa ? Prenons un exemple.
// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
Lorsque j'exécute ce programme sur mon ordinateur portable, il se bloque généralement. Pourquoi cela arrive-t-il? Si nous regardons attentivement, nous pouvons voir que lorsque nous transférons de l'argent, nous entrons dans la méthode de transfert qui est synchronisée et verrouille l'accès à toutes les méthodes synchronisées sur le compte source, puis verrouille le compte de destination qui verrouille l'accès à toutes les méthodes synchronisées.
Imaginez le scénario suivant :
- Le premier thread appelle le transfert du compte de Bob vers le compte de Joe
- Le deuxième fil appelle le transfert du compte de Joe vers le compte de Bob
- Le deuxième fil diminue le montant du compte de Joe
- Le deuxième thread va déposer le montant sur le compte de Bob mais attend que le premier thread termine le transfert.
- Le premier fil diminue le montant du compte de Bob
- Le premier thread va déposer le montant sur le compte de Joe mais attend que le deuxième thread termine le transfert.
Dans ce scénario, un thread attend qu'un autre thread termine le transfert et vice versa. Ils sont coincés les uns avec les autres et le programme ne peut pas continuer. C'est ce qu'on appelle l'impasse. Pour éviter les impasses, il est nécessaire de verrouiller les comptes dans le même ordre. Pour corriger le programme, nous attribuerons à chaque compte un numéro unique afin que nous puissions verrouiller les comptes dans le même ordre lors du transfert de l'argent.
// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
En raison de l'imprévisibilité de telles erreurs, elles se produisent parfois, mais pas toujours et elles sont difficiles à reproduire. Si le programme se comporte de manière imprévisible, cela est généralement causé par la concurrence qui introduit un non-déterminisme accidentel. Pour éviter un non-déterminisme accidentel, nous devrions au préalable concevoir le programme pour prendre en compte tous les entrelacs.
Un exemple de programme qui a un non-déterminisme accidentel.
// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }
Ce programme contient un non-déterminisme accidentel. La dernière valeur entrée dans le conteneur sera affichée.
java NonDeterminism Slow
Les threads plus lents entreront la valeur plus tard, et cette valeur sera imprimée (Lente). Mais cela ne doit pas être le cas. Que se passe-t-il si l'ordinateur exécute simultanément un autre programme nécessitant beaucoup de ressources CPU ? Nous n'avons aucune garantie que ce sera le thread le plus lent qui entrera la valeur en dernier car il est contrôlé par le système d'exploitation et non par le programme. Nous pouvons avoir des situations où le programme fonctionne sur un ordinateur et sur l'autre se comporte différemment. De telles erreurs sont difficiles à trouver et causent des maux de tête aux développeurs. Pour toutes ces raisons, ce modèle de concurrence est très difficile à faire correctement.
Manière fonctionnelle
Parallélisme
Regardons un autre modèle utilisé par les langages fonctionnels. Par exemple, nous utiliserons Clojure, qui peut être interprété à l'aide de l'outil Leiningen. Clojure est un langage très intéressant avec un bon support pour la concurrence. Le modèle de concurrence précédent était avec un état mutable partagé. Les classes que nous utilisons peuvent également avoir un état caché qui mute et que nous ne connaissons pas, car cela n'est pas évident à partir de leur API. Comme nous l'avons vu, ce modèle peut provoquer des non-déterminismes accidentels et des blocages si nous n'y prenons garde. Les langages fonctionnels ont des types de données qui ne mutent pas et peuvent donc être partagés en toute sécurité sans risquer qu'ils changent. Les fonctions ont des propriétés ainsi que d'autres types de données. Les fonctions peuvent être créées pendant l'exécution du programme et transmises en tant que paramètre à une autre fonction ou renvoyées à la suite de l'appel de la fonction.
Les primitives de base pour la programmation concurrente sont futures et prometteuses. Future exécute un bloc de code dans un autre thread et renvoie un objet pour la valeur future qui sera saisie lors de l'exécution du bloc.
; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))
Lorsque j'exécute ce script, le résultat est :
Started A Started B Waiting for futures Finished A Finished B 10
Dans cet exemple, nous avons deux blocs futurs qui sont exécutés indépendamment. Le programme ne se bloque que lors de la lecture de la valeur de l'objet futur qui n'est pas encore disponible. Dans notre cas, en attendant que les deux résultats des futurs blocs soient additionnés. Le comportement est prévisible (déterministe) et donnera toujours le même résultat car il n'y a pas d'état mutable partagé.
Une autre primitive utilisée pour la concurrence est une promesse. La promesse est un conteneur dans lequel on peut mettre une valeur une fois. Lors de la lecture des promesses, le thread attendra que la valeur de la promesse soit remplie.
; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)
Dans cet exemple, le futur attendra pour imprimer le résultat tant que la promesse de ne pas être enregistrée vaut. Après deux secondes, dans la promesse sera stockée la valeur 42 à imprimer dans le futur thread. L'utilisation de promesses peut conduire à une impasse par opposition à l'avenir, alors soyez prudent lorsque vous utilisez promesse.
; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)
Dans cet exemple, nous utilisons le résultat du futur et le résultat de la promesse. L'ordre de définition et de lecture des valeurs est que le thread principal attend une valeur du futur thread et que le futur thread attend une valeur du thread principal. Ce comportement sera prévisible (déterministe) et se reproduira à chaque exécution du programme, ce qui facilitera la recherche et la suppression des erreurs.
L'utilisation du futur permet au programme de poursuivre l'exercice jusqu'à ce qu'il ait besoin du résultat de l'exécution du futur. Il en résulte une exécution plus rapide du programme. Si vous avez plusieurs processeurs avec le futur, vous pouvez effectuer une exécution parallèle de programmes qui ont un comportement prévisible (déterministe) (chaque fois donne le même résultat). De cette façon, nous exploitons mieux la puissance de l'ordinateur.
; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))
Dans cet exemple, vous pouvez voir comment l'utilisation de future peut faire un meilleur usage de la vitesse d'un ordinateur. Nous avons deux nombres de Fibonacci qui s'additionnent. Nous pouvons voir que le programme calcule le résultat deux fois, la première fois séquentiellement dans un seul thread, et la deuxième fois en parallèle dans deux threads. Comme mon ordinateur portable a un processeur multicœur, l'exécution parallèle fonctionne deux fois plus vite que le calcul séquentiel.
Le résultat de l'exécution de ce script sur mon ordinateur portable :
Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"
Concurrence
Pour prendre en charge la concurrence et l'imprévisibilité dans le langage de programmation Clojure, nous devons utiliser un type de données variable afin que les autres threads puissent voir les modifications. Le type de données variable le plus simple est atom. Atom est un conteneur qui a toujours la valeur qui peut être remplacée par une autre valeur. La valeur peut être remplacée en entrant une nouvelle valeur ou en appelant une fonction qui prend l'ancienne valeur et renvoie la nouvelle valeur qui est plus fréquemment utilisée. Il est intéressant de noter qu'atom est implémenté sans verrouillage et qu'il peut être utilisé en toute sécurité dans les threads, ce qui signifie qu'il est impossible d'atteindre un blocage. En interne, atom utilise la bibliothèque java.util.concurrent.AtomicReference. Regardons un contre-exemple implémenté avec atom.
; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
Le résultat de l'exécution du script sur mon portable :
The counter is: 1000000 Number of attempts: 1680212
Dans cet exemple, nous utilisons un atome qui contient la valeur du compteur. Le compteur augmente de (swap! counter inc). La fonction Swap fonctionne comme ceci : 1. prend la valeur du compteur et la conserve 2. pour cette valeur appelle la fonction donnée qui calcule la nouvelle valeur 3. pour enregistrer la nouvelle valeur, elle utilise une opération atomique qui vérifie si l'ancienne valeur a changé 3a. si la valeur n'a pas changé, il entre une nouvelle valeur 3b. si la valeur est changée entre-temps, alors passez à l'étape 1 On voit que la fonction peut être appelée à nouveau si la valeur est changée entre-temps. La valeur ne peut être modifiée qu'à partir d'un autre thread. Par conséquent, il est essentiel que la fonction qui calcule une nouvelle valeur n'ait pas d'effets secondaires afin que peu importe si elle est appelée plusieurs fois. Une limitation d'atom est qu'il synchronise les modifications sur une seule valeur.
; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)
Quand j'exécute ce script j'obtiens :

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525
Dans cet exemple, nous pouvons voir comment nous changeons plus d'atomes. À un moment donné, une incohérence peut arriver. La somme de deux comptes à un moment donné n'est pas la même. Si nous devons coordonner les changements de plusieurs valeurs, il existe deux solutions :
- Placer plus de valeurs dans un atome
- Utiliser les références et la mémoire transactionnelle logicielle, comme nous le verrons plus loin
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)
Lorsque j'exécute ce script sur mon ordinateur, j'obtiens:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
Dans l'exemple, la coordination a été résolue afin que nous mettions plus de valeur à l'aide d'une carte. Lorsque nous transférons de l'argent du compte, nous changeons tous les comptes à la fois afin qu'il n'arrive jamais que la somme d'argent ne soit pas la même.
Le type de données variable suivant est agent. L'agent se comporte comme un atome uniquement dans la mesure où la fonction qui modifie la valeur est exécutée dans un thread différent, de sorte qu'il faut un certain temps pour que le changement devienne visible. Par conséquent, lors de la lecture de la valeur de l'agent, il est nécessaire d'appeler une fonction qui attend que toutes les fonctions qui modifient la valeur de l'agent soient exécutées. Contrairement aux atomes, la fonction qui modifie la valeur n'est appelée qu'une seule fois et peut donc avoir des effets secondaires. Ce type peut également synchroniser une valeur et ne peut pas bloquer.
; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
Lorsque j'exécute ce script sur mon ordinateur portable, j'obtiens:
The counter is: 1000000 Number of attempts: 1000000
Cet exemple est le même que l'implémentation du compteur avec l'atome. La seule différence est qu'ici, nous attendons que tous les changements d'agent soient terminés avant de lire la valeur finale à l'aide de await.
Le dernier type de données variable sont des références. Contrairement aux atomes, les références peuvent synchroniser les modifications apportées à plusieurs valeurs. Chaque opération sur référence doit être dans une transaction utilisant dosync. Cette façon de modifier les données est appelée mémoire transactionnelle logicielle ou en abrégé STM. Prenons un exemple avec le transfert d'argent dans les comptes.
; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)
Lorsque j'exécute ce script, j'obtiens :
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000
Fait intéressant, il y a eu plus de tentatives que le nombre de transactions effectuées. C'est parce que le STM n'utilise pas de verrous, donc s'il y a un conflit (comme deux threads essayant de changer la même valeur), la transaction sera réexécutée. Pour cette raison, la transaction ne devrait pas avoir d'effets secondaires. Nous pouvons voir que l'agent dont la valeur change dans la transaction se comporte de manière prévisible. Une fonction qui change la valeur de l'agent sera évaluée autant de fois qu'il y a de transactions. La raison en est que l'agent est conscient des transactions. Si la transaction doit avoir des effets secondaires, ils doivent être mis en fonction au sein de l'agent. De cette façon, le programme aura un comportement prévisible. Vous pensez probablement que vous devriez toujours utiliser STM, mais les programmeurs expérimentés utiliseront souvent des atomes car les atomes sont plus simples et plus rapides que STM. Bien sûr, c'est s'il est possible de faire un programme de cette façon. Si vous avez des effets secondaires, il n'y a pas d'autre choix que d'utiliser STM et des agents.
Modèle d'acteur
Le modèle de concurrence suivant est un modèle d'acteur. Le principe de ce modèle est similaire au monde réel. Si nous concluons un accord pour créer quelque chose avec plusieurs personnes, par exemple un bâtiment, alors chaque homme sur le chantier a son propre rôle. Une foule de personnes est supervisée par le superviseur. Si un travailleur est blessé au travail, le superviseur assignera le travail de l'homme blessé aux autres qui sont disponibles. Si nécessaire, il peut conduire sur le site un nouvel homme. Sur le site nous avons plus de personnes qui font le travail simultanément (concurrently), mais aussi qui se parlent pour se synchroniser. Si nous mettions le travail sur le chantier de construction dans le programme, alors chaque personne serait un acteur qui a un état et exécute dans son propre processus, et les paroles seraient remplacées par des messages. Le langage de programmation populaire basé sur ce modèle est Erlang. Ce langage intéressant a des types de données immuables et des fonctions qui ont les mêmes propriétés que les autres types de données. Les fonctions peuvent être créées pendant l'exécution du programme et transmises en tant qu'arguments à une autre fonction ou renvoyées à la suite d'un appel de fonction. Je vais donner des exemples dans le langage Elixir qui utilise la machine virtuelle Erlang, donc j'aurai le même modèle de programmation qu'Erlang juste une syntaxe différente. Les trois primitives les plus importantes d'Elixir sont le spawn, l'envoi et la réception. spawn exécute la fonction dans le nouveau processus, send envoie le message au processus et receive reçoit les messages qui sont envoyés au processus en cours.
Le premier exemple avec le modèle d'acteur sera augmenté de compteurs simultanément. Pour faire un programme avec ce modèle, il est nécessaire de faire en sorte qu'un acteur ait la valeur du compteur et reçoive un message pour définir et récupérer la valeur du compteur, et avoir deux acteurs qui augmenteront simultanément la valeur du compteur.
# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
Lorsque j'exécute cet exemple, j'obtiens :
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827
On voit qu'au final le compteur est 516827 et non 1000000 comme on s'y attendait. Lorsque j'ai exécuté le script la prochaine fois, j'ai reçu 511010. La raison de ce comportement est que le compteur reçoit deux messages : récupérer la valeur actuelle et définir la nouvelle valeur. Pour augmenter le compteur, le programme doit obtenir la valeur actuelle, l'augmenter de 1 et définir la valeur augmentée. Deux processus lisent et écrivent la valeur du compteur en même temps en utilisant des messages envoyés au processus de compteur. L'ordre des messages que le compteur recevra est imprévisible et le programme ne peut pas le contrôler. On peut imaginer ce scénario :
- La valeur du compteur est 115
- Le processus A lit la valeur du compteur (115)
- Le processus B lit la valeur du compteur (115)
- Le processus B augmente la valeur localement (116)
- Le processus B définit la valeur augmentée sur le compteur (116)
- Le processus A augmente la valeur du compteur (116)
- Le processus A définit la valeur augmentée sur le compteur (116)
- La valeur du compteur est 116
Si nous regardons le scénario, deux processus augmentent le compteur de 1, et le compteur est augmenté à la fin de 1 et non de 2. De tels entrelacs peuvent se produire un nombre imprévisible de fois et donc la valeur du compteur est imprévisible. Pour éviter ce comportement, l'opération d'augmentation doit être effectuée par un message.
# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
En exécutant ce script j'obtiens :
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000
Nous pouvons voir que le compteur a la bonne valeur. La raison du comportement prévisible (déterministe) est que la valeur du compteur augmente d'un message de sorte que la séquence de messages pour augmenter le compteur n'affecte pas sa valeur finale. En travaillant avec le modèle d'acteur, nous devons faire attention à la façon dont les messages peuvent s'entrelacer et à une conception soignée des messages et des actions sur les messages pour éviter l'imprévisibilité accidentelle (non-déterminisme).
Comment transférer de l'argent entre deux comptes avec ce modèle ?
# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end
Quand j'exécute ce script j'obtiens :
Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
Nous pouvons voir que le transfert d'argent fonctionne sans incohérences, car nous avons choisi le transfert de message pour transférer de l'argent et des montants de message pour obtenir la valeur des comptes, ce qui nous donne un comportement prévisible du programme. Chaque fois que nous effectuons un transfert d'argent, le montant total d'argent à tout moment devrait être le même.
Le modèle d'acteur peut provoquer un blocage et donc un interblocage, soyez donc prudent lors de la conception du programme. Le script suivant montre comment vous pouvez simuler le scénario de verrouillage et d'interblocage.
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking B, A started Waiting for locking to be done
From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.
# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished
And now, there is no longer a deadlock.
Emballer
As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.
Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.
The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.
Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.
I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.