Einführung in die gleichzeitige Programmierung: Ein Leitfaden für Anfänger
Veröffentlicht: 2022-03-11Was ist gleichzeitige Programmierung? Einfach gesagt, wenn Sie mehr als eine Sache gleichzeitig tun. Nicht zu verwechseln mit Parallelität, Parallelität liegt vor, wenn mehrere Sequenzen von Vorgängen in überlappenden Zeiträumen ausgeführt werden. Im Bereich der Programmierung ist Nebenläufigkeit ein ziemlich komplexes Thema. Der Umgang mit Konstrukten wie Threads und Sperren und das Vermeiden von Problemen wie Race Conditions und Deadlocks kann ziemlich umständlich sein, was das Schreiben gleichzeitiger Programme erschwert. Durch Nebenläufigkeit können Programme als unabhängige Prozesse gestaltet werden, die in einer bestimmten Zusammensetzung zusammenarbeiten. Eine solche Struktur kann parallel gemacht werden oder nicht; Das Erreichen einer solchen Struktur in Ihrem Programm bietet jedoch zahlreiche Vorteile.
In diesem Artikel werfen wir einen Blick auf eine Reihe verschiedener Modelle der Parallelität und wie man sie in verschiedenen Programmiersprachen erreicht, die für Parallelität entwickelt wurden.
Shared Mutable State Model
Schauen wir uns ein einfaches Beispiel mit einem Zähler und zwei Threads an, die ihn erhöhen. Das Programm sollte nicht zu kompliziert sein. Wir haben ein Objekt, das einen Zähler enthält, der mit der Methode raise steigt, und ihn mit der Methode get und zwei Threads abruft, die ihn erhöhen.
// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }
Dieses naive Programm ist nicht so naiv, wie es auf den ersten Blick scheint. Wenn ich dieses Programm mehrmals ausführe, erhalte ich unterschiedliche Ergebnisse. Es gibt drei Werte nach drei Ausführungen auf meinem Laptop.
java Counting 553706 java Counting 547818 java Counting 613014
Was ist der Grund für dieses unvorhersehbare Verhalten? Das Programm erhöht den Zähler an einer Stelle, in der Methode raise, die den Befehl counter++ verwendet. Wenn wir uns den Befehlsbytecode ansehen, sehen wir, dass er aus mehreren Teilen besteht:
- Zählerwert aus dem Speicher lesen
- Wertsteigerung vor Ort
- Zählerwert im Speicher speichern
Jetzt können wir uns vorstellen, was in dieser Reihenfolge schief gehen kann. Wenn wir zwei Threads haben, die den Zähler unabhängig voneinander erhöhen, könnten wir dieses Szenario haben:
- Zählerwert ist 115
- Erster Thread liest den Wert des Zählers aus dem Speicher (115)
- Erster Thread erhöht den lokalen Zählerwert (116)
- Zweiter Thread liest den Wert des Zählers aus dem Speicher (115)
- Zweiter Thread erhöht den lokalen Zählerwert (116)
- Zweiter Thread speichert den lokalen Zählerwert im Speicher (116)
- Erster Thread speichert den lokalen Zählerwert im Speicher (116)
- Wert des Zählers ist 116
In diesem Szenario werden zwei Threads miteinander verflochten, sodass der Zählerwert um 1 erhöht wird, aber der Zählerwert sollte um 2 erhöht werden, weil jeder Thread ihn um 1 erhöht. Unterschiedliche Threadverflechtungen beeinflussen das Ergebnis des Programms. Der Grund für die Unvorhersehbarkeit des Programms liegt darin, dass das Programm keine Kontrolle über die Thread-Verflechtung hat, sondern das Betriebssystem. Bei jeder Ausführung des Programms können sich Threads anders verflechten. Auf diese Weise haben wir zufällige Unvorhersagbarkeit (Nichtdeterminismus) in das Programm eingeführt.
Um diese versehentliche Unvorhersagbarkeit (Nicht-Determinismus) zu beheben, muss das Programm die Kontrolle über die Fadenverflechtung haben. Wenn sich ein Thread in der Methode erhöht, darf ein anderer Thread nicht in derselben Methode sein, bis der erste aus ihm herauskommt. Auf diese Weise serialisieren wir den Zugriff auf die Methodenerhöhung.
// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
Eine andere Lösung besteht darin, einen Zähler zu verwenden, der sich atomar erhöhen kann, was bedeutet, dass die Operation nicht in mehrere Operationen aufgeteilt werden kann. Auf diese Weise brauchen wir keine Codeblöcke, die synchronisiert werden müssen. Java hat atomare Datentypen im Namespace java.util.concurrent.atomic, und wir verwenden AtomicInteger.
// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }
Atomic Integer verfügt über die Operationen, die wir benötigen, sodass wir sie anstelle der Counter-Klasse verwenden können. Es ist interessant anzumerken, dass alle Methoden von Atomic Integer kein Locking verwenden, so dass es keine Möglichkeit von Deadlocks gibt, was das Design des Programms erleichtert.
Die Verwendung synchronisierter Schlüsselwörter zum Synchronisieren kritischer Methoden sollte alle Probleme lösen, richtig? Stellen wir uns vor, wir haben zwei Konten, die einzahlen, abheben und auf ein anderes Konto überweisen können. Was passiert, wenn wir gleichzeitig Geld von einem Konto auf ein anderes und umgekehrt überweisen wollen? Schauen wir uns ein Beispiel an.
// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
Wenn ich dieses Programm auf meinem Laptop ausführe, bleibt es normalerweise hängen. Warum passiert das? Wenn wir genau hinsehen, können wir sehen, dass wir beim Geldtransfer in die Transfermethode eintreten, die synchronisiert ist und den Zugriff auf alle synchronisierten Methoden auf dem Quellkonto sperrt, und dann das Zielkonto sperrt, das den Zugriff auf alle synchronisierten Methoden darauf sperrt.
Stellen Sie sich folgendes Szenario vor:
- Erste Thread-Anrufe werden von Bobs Konto auf Joes Konto übertragen
- Zweite Thread-Anrufe werden von Joes Konto auf Bobs Konto übertragen
- Der zweite Thread verringert den Betrag von Joes Konto
- Der zweite Thread überweist den Betrag auf Bobs Konto, wartet aber darauf, dass der erste Thread die Übertragung abschließt.
- Der erste Thread verringert den Betrag von Bobs Konto
- Der erste Thread überweist den Betrag auf Joes Konto, wartet aber darauf, dass der zweite Thread die Übertragung abschließt.
In diesem Szenario wartet ein Thread darauf, dass ein anderer Thread die Übertragung beendet und umgekehrt. Sie stecken fest und das Programm kann nicht fortgesetzt werden. Dies wird als Deadlock bezeichnet. Um Deadlocks zu vermeiden, müssen Konten in derselben Reihenfolge gesperrt werden. Um das Programm zu reparieren, geben wir jedem Konto eine eindeutige Nummer, damit wir Konten in derselben Reihenfolge sperren können, wenn wir das Geld überweisen.
// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }
Aufgrund der Unvorhersehbarkeit solcher Fehler passieren sie manchmal, aber nicht immer, und sie sind schwer zu reproduzieren. Wenn sich das Programm unvorhersehbar verhält, wird dies normalerweise durch Nebenläufigkeit verursacht, die einen zufälligen Nichtdeterminismus einführt. Um einen zufälligen Nichtdeterminismus zu vermeiden, sollten wir im Voraus ein Programm entwerfen, das alle Verflechtungen berücksichtigt.
Ein Beispiel für ein Programm, das einen zufälligen Nichtdeterminismus hat.
// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }
Dieses Programm enthält einen zufälligen Nichtdeterminismus. Der zuletzt in den Container eingegebene Wert wird angezeigt.
java NonDeterminism Slow
Langsamere Threads geben den Wert später ein und dieser Wert wird gedruckt (Langsam). Aber dies muss nicht der Fall sein. Was ist, wenn der Computer gleichzeitig ein anderes Programm ausführt, das viele CPU-Ressourcen benötigt? Wir haben keine Garantie dafür, dass der langsamere Thread den Wert zuletzt eingibt, da er vom Betriebssystem und nicht vom Programm gesteuert wird. Es kann Situationen geben, in denen das Programm auf einem Computer funktioniert und sich auf dem anderen anders verhält. Solche Fehler sind schwer zu finden und bereiten Entwicklern Kopfzerbrechen. Aus all diesen Gründen ist es sehr schwierig, dieses Nebenläufigkeitsmodell richtig zu machen.
Funktionaler Weg
Parallelität
Schauen wir uns ein anderes Modell an, das funktionale Sprachen verwenden. Zum Beispiel verwenden wir Clojure, das mit dem Tool Leiningen interpretiert werden kann. Clojure ist eine sehr interessante Sprache mit guter Unterstützung für Nebenläufigkeit. Das vorherige Nebenläufigkeitsmodell hatte einen gemeinsamen veränderlichen Zustand. Klassen, die wir verwenden, können auch einen versteckten Zustand haben, der mutiert, von dem wir nichts wissen, weil er aus ihrer API nicht ersichtlich ist. Wie wir gesehen haben, kann dieses Modell unbeabsichtigten Nichtdeterminismus und Deadlocks verursachen, wenn wir nicht aufpassen. Funktionale Sprachen haben Datentypen, die nicht mutieren, sodass sie sicher geteilt werden können, ohne das Risiko, dass sie sich ändern. Funktionen haben Eigenschaften sowie andere Datentypen. Funktionen können während der Programmausführung erstellt und als Parameter an eine andere Funktion übergeben oder als Ergebnis des Funktionsaufrufs zurückgegeben werden.
Grundlegende Primitive für die gleichzeitige Programmierung sind zukunftsträchtig und vielversprechend. Future führt einen Codeblock in einem anderen Thread aus und gibt ein Objekt für den Future-Wert zurück, der eingegeben wird, wenn der Block ausgeführt wird.
; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))
Wenn ich dieses Skript ausführe, ist die Ausgabe:
Started A Started B Waiting for futures Finished A Finished B 10
In diesem Beispiel haben wir zwei Future-Blöcke, die unabhängig voneinander ausgeführt werden. Programmieren Sie nur Blöcke, wenn Sie den noch nicht verfügbaren Wert aus dem zukünftigen Objekt lesen. In unserem Fall wird darauf gewartet, dass beide Ergebnisse zukünftiger Blöcke summiert werden. Das Verhalten ist vorhersagbar (deterministisch) und liefert immer das gleiche Ergebnis, da es keinen gemeinsamen veränderlichen Zustand gibt.
Ein weiteres Grundelement, das für Parallelität verwendet wird, ist ein Versprechen. Promise ist ein Container, in den man einmalig einen Wert stecken kann. Beim Lesen von Versprechen wartet der Thread, bis der Wert des Versprechens gefüllt ist.
; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)
In diesem Beispiel wird die Zukunft warten, bis das Ergebnis gedruckt wird, solange das Versprechen nicht gespeichert werden soll. Nach zwei Sekunden wird im Versprechen der Wert 42 gespeichert, der im zukünftigen Thread gedruckt wird. Die Verwendung von Promises kann im Gegensatz zur Zukunft zu einem Stillstand führen, seien Sie also vorsichtig, wenn Sie Promises verwenden.
; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)
In diesem Beispiel verwenden wir das Ergebnis der Zukunft und das Ergebnis des Versprechens. Die Reihenfolge des Setzens und Lesens von Werten ist, dass der Haupt-Thread auf einen Wert vom Future-Thread wartet und der Future-Thread auf einen Wert vom Haupt-Thread wartet. Dieses Verhalten ist vorhersagbar (deterministisch) und wird bei jeder Ausführung des Programms abgespielt, was das Auffinden und Entfernen von Fehlern erleichtert.
Die Verwendung der Zukunft ermöglicht es dem Programm, mit der Übung fortzufahren, bis es das Ergebnis der Ausführung der Zukunft benötigt. Dies führt zu einer schnelleren Programmausführung. Wenn Sie mehrere Prozessoren mit der Zukunft haben, können Sie Programme parallel ausführen, die ein vorhersagbares (deterministisches) Verhalten haben (jedes Mal das gleiche Ergebnis liefert). So nutzen wir die Leistungsfähigkeit des Computers besser aus.
; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))
In diesem Beispiel sehen Sie, wie die Verwendung von future die Geschwindigkeit eines Computers besser ausnutzen kann. Wir haben zwei Fibonacci-Zahlen, die sich addieren. Wir können sehen, dass das Programm das Ergebnis zweimal berechnet, das erste Mal nacheinander in einem einzelnen Thread und das zweite Mal parallel in zwei Threads. Da mein Laptop über einen Multicore-Prozessor verfügt, arbeitet die parallele Ausführung doppelt so schnell wie die sequentielle Berechnung.
Das Ergebnis der Ausführung dieses Skripts auf meinem Laptop:
Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"
Parallelität
Um Parallelität und Unvorhersehbarkeit in der Programmiersprache Clojure zu unterstützen, müssen wir einen variablen Datentyp verwenden, damit andere Threads die Änderungen sehen können. Der einfachste variable Datentyp ist Atom. Atom ist ein Container, der immer den Wert hat, der durch einen anderen Wert ersetzt werden kann. Der Wert kann ersetzt werden, indem ein neuer Wert eingegeben wird oder eine Funktion aufgerufen wird, die den alten Wert übernimmt und einen neuen Wert zurückgibt, der häufiger verwendet wird. Interessant ist, dass Atom ohne Sperren implementiert ist und sicher in Threads verwendet werden kann, was bedeutet, dass es unmöglich ist, Deadlocks zu erreichen. Intern verwendet Atom die java.util.concurrent.AtomicReference-Bibliothek. Schauen wir uns ein mit atom implementiertes Gegenbeispiel an.
; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
Das Ergebnis der Skriptausführung auf meinem Laptop:
The counter is: 1000000 Number of attempts: 1680212
In diesem Beispiel verwenden wir ein Atom, das den Wert des Zählers enthält. Der Zähler erhöht sich mit (swap! counter inc). Die Swap-Funktion funktioniert wie folgt: 1. Nehmen Sie den Zählerwert und bewahren Sie ihn auf. 2. Rufen Sie für diesen Wert die angegebene Funktion auf, die den neuen Wert berechnet. 3. Um den neuen Wert zu speichern, wird eine atomare Operation verwendet, die überprüft, ob sich der alte Wert geändert hat. 3a. wenn sich der Wert nicht geändert hat, trägt es einen neuen Wert ein 3b. wenn der Wert zwischenzeitlich geändert wird, dann gehe zu Schritt 1 Wir sehen, dass die Funktion erneut aufgerufen werden kann, wenn der Wert zwischenzeitlich geändert wird. Der Wert kann nur von einem anderen Thread geändert werden. Daher ist es wichtig, dass die Funktion, die einen neuen Wert berechnet, keine Seiteneffekte hat, damit es keine Rolle spielt, ob sie öfter aufgerufen wird. Eine Einschränkung von Atom besteht darin, dass Änderungen auf einen Wert synchronisiert werden.
; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)
Wenn ich dieses Skript ausführe, bekomme ich:

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525
In diesem Beispiel können wir sehen, wie wir mehr Atome verändern. An einem Punkt kann es zu Inkonsistenzen kommen. Die Summe zweier Konten zu einem bestimmten Zeitpunkt ist nicht gleich. Wenn wir Änderungen mehrerer Werte koordinieren müssen, gibt es zwei Lösungen:
- Platziere mehr Werte in einem Atom
- Verwenden Sie Referenzen und Software-Transaktionsspeicher, wie wir später sehen werden
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)
Wenn ich dieses Skript auf meinem Computer ausführe, erhalte ich:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
Im Beispiel wurde die Koordination so gelöst, dass wir mehr Wert auf eine Karte legen. Wenn wir Geld vom Konto überweisen, ändern wir alle Konten zu diesem Zeitpunkt, damit es nie vorkommt, dass der Geldbetrag nicht gleich ist.
Der nächste variable Datentyp ist Agent. Der Agent verhält sich wie ein Atom, nur dass die Funktion, die den Wert ändert, in einem anderen Thread ausgeführt wird, sodass es einige Zeit dauert, bis die Änderung sichtbar wird. Daher muss beim Lesen des Wertes des Agenten eine Funktion aufgerufen werden, die wartet, bis alle Funktionen ausgeführt sind, die den Wert des Agenten ändern. Im Gegensatz zu Atomen wird eine Funktion, die den Wert ändert, nur einmal aufgerufen und kann daher Nebenwirkungen haben. Dieser Typ kann auch einen Wert synchronisieren und kann keine Deadlocks ausführen.
; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)
Wenn ich dieses Skript auf meinem Laptop ausführe, erhalte ich:
The counter is: 1000000 Number of attempts: 1000000
Dieses Beispiel ist dasselbe wie die Implementierung des Zählers mit dem Atom. Der einzige Unterschied besteht darin, dass wir hier warten, bis alle Agentenänderungen abgeschlossen sind, bevor der endgültige Wert mit await gelesen wird.
Der letzte variable Datentyp sind Referenzen. Im Gegensatz zu Atomen können Referenzen Änderungen an mehreren Werten synchronisieren. Jede Referenzoperation sollte sich in einer Transaktion befinden, die dosync verwendet. Diese Art der Datenänderung wird Software Transactional Memory oder kurz STM genannt. Schauen wir uns ein Beispiel mit der Geldüberweisung auf den Konten an.
; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)
Wenn ich dieses Skript ausführe, erhalte ich:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000
Interessanterweise gab es mehr Versuche als Transaktionen getätigt wurden. Dies liegt daran, dass der STM keine Sperren verwendet, sodass bei einem Konflikt (z. B. wenn zwei Threads versuchen, denselben Wert zu ändern) die Transaktion erneut ausgeführt wird. Aus diesem Grund sollte die Transaktion keine Nebenwirkungen haben. Wir können sehen, dass sich der Agent, dessen Wert sich innerhalb der Transaktion ändert, vorhersehbar verhält. Eine Funktion, die den Wert des Agenten ändert, wird so oft ausgewertet, wie es Transaktionen gibt. Der Grund dafür ist, dass der Agent transaktionsbewusst ist. Wenn Transaktionen Nebenwirkungen haben müssen, sollten sie innerhalb des Agenten in Funktion gesetzt werden. Auf diese Weise wird das Programm ein vorhersagbares Verhalten haben. Sie denken wahrscheinlich, dass Sie immer STM verwenden sollten, aber erfahrene Programmierer werden oft Atome verwenden, weil Atome einfacher und schneller als STM sind. Natürlich nur, wenn es möglich ist, ein Programm auf diese Weise zu erstellen. Wenn Sie Nebenwirkungen haben, dann gibt es keine andere Wahl, als STM und Mittel zu verwenden.
Schauspielermodell
Das folgende Parallelitätsmodell ist ein Akteursmodell. Das Prinzip dieses Modells ähnelt der realen Welt. Wenn wir einen Deal machen, um mit vielen Leuten etwas zu schaffen, zum Beispiel ein Gebäude, dann hat jeder Mann auf der Baustelle seine eigene Rolle. Eine Menschenmenge wird vom Vorgesetzten überwacht. Wenn ein Arbeitnehmer bei der Arbeit verletzt wird, weist der Vorgesetzte die Arbeit des Verletzten den anderen verfügbaren Personen zu. Bei Bedarf kann er einen neuen Mann auf die Baustelle führen. Auf der Website haben wir mehr Leute, die die Arbeit gleichzeitig (gleichzeitig) erledigen, aber auch miteinander sprechen, um sich zu synchronisieren. Wenn wir die Arbeit auf der Baustelle ins Programm stellen, dann wäre jeder Mensch ein Akteur, der einen Zustand hat und in seinem eigenen Prozess ausführt, und das Reden würde durch Botschaften ersetzt. Die populäre Programmiersprache, die auf diesem Modell basiert, ist Erlang. Diese interessante Sprache hat unveränderliche Datentypen und Funktionen, die die gleichen Eigenschaften wie andere Datentypen haben. Funktionen können während der Programmausführung erstellt und als Argumente an eine andere Funktion übergeben oder als Ergebnis eines Funktionsaufrufs zurückgegeben werden. Ich werde Beispiele in der Elixir-Sprache geben, die die virtuelle Erlang-Maschine verwendet, also habe ich das gleiche Programmiermodell wie Erlang, nur eine andere Syntax. Die drei wichtigsten Primitiven in Elixir sind Spawn, Send und Receive. spawn führt die Funktion im neuen Prozess aus, send sendet die Nachricht an den Prozess und Receive empfängt Nachrichten, die an den aktuellen Prozess gesendet werden.
Das erste Beispiel mit dem Akteurmodell wird gleichzeitig gegenläufig erhöht. Um ein Programm mit diesem Modell zu erstellen, ist es notwendig, einen Akteur dazu zu bringen, den Wert des Zählers zu haben und eine Nachricht zu empfangen, um den Wert des Zählers einzustellen und abzurufen, und zwei Akteure zu haben, die gleichzeitig den Wert des Zählers erhöhen.
# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
Wenn ich dieses Beispiel ausführe, bekomme ich:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827
Wir können sehen, dass der Zähler am Ende 516827 und nicht 1000000 ist, wie wir erwartet haben. Als ich das Skript das nächste Mal ausführte, erhielt ich 511010. Der Grund für dieses Verhalten ist, dass der Zähler zwei Nachrichten erhält: den aktuellen Wert abrufen und den neuen Wert festlegen. Um den Zähler zu erhöhen, muss das Programm den aktuellen Wert abrufen, um 1 erhöhen und den erhöhten Wert einstellen. Zwei Prozesse lesen und schreiben den Wert des Zählers gleichzeitig, indem sie Nachrichten verwenden, die an den Zählerprozess gesendet werden. Die Reihenfolge der Nachrichten, die der Zähler erhält, ist unvorhersehbar und kann vom Programm nicht gesteuert werden. Wir können uns dieses Szenario vorstellen:
- Zählerwert ist 115
- Prozess A liest den Wert des Zählers (115)
- Prozess B liest den Wert des Zählers (115)
- Prozess B erhöht den Wert lokal (116)
- Prozess B setzt erhöhten Wert auf den Zähler (116)
- Prozess A erhöht den Wert des Zählers (116)
- Prozess A setzt den erhöhten Wert auf den Zähler (116)
- Zählerwert ist 116
Wenn wir uns das Szenario ansehen, zwei Prozesse erhöhen den Zähler um 1, und der Zähler wird am Ende um 1 und nicht um 2 erhöht. Solche Verflechtungen können unvorhersehbar oft vorkommen und daher ist der Wert des Zählers unvorhersehbar. Um dieses Verhalten zu verhindern, muss der Vergrößerungsvorgang von einer Nachricht ausgeführt werden.
# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end
Wenn ich dieses Skript ausführe, bekomme ich:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000
Wir können sehen, dass der Zähler den richtigen Wert hat. Der Grund für vorhersagbares (deterministisches) Verhalten besteht darin, dass der Wert des Zählers um eine Nachricht erhöht wird, sodass die Folge von Nachrichten zum Erhöhen des Zählers seinen endgültigen Wert nicht beeinflusst. Bei der Arbeit mit dem Akteursmodell müssen wir darauf achten, wie sich Nachrichten verflechten können, und Nachrichten und Aktionen auf Nachrichten sorgfältig entwerfen, um zufällige Unvorhersehbarkeit (Nichtdeterminismus) zu vermeiden.
Wie können wir mit diesem Modell Geld zwischen zwei Konten überweisen?
# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end
Wenn ich dieses Skript ausführe, bekomme ich:
Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0
Wir können sehen, dass die Geldüberweisung ohne Inkonsistenzen funktioniert, weil wir die Nachrichtenüberweisung gewählt haben, um Geld und Nachrichtenbeträge zu überweisen, um den Wert von Konten zu erhalten, was uns ein vorhersehbares Verhalten des Programms gibt. Wann immer wir Geld überweisen, sollte der Gesamtgeldbetrag zu jeder Zeit gleich sein.
Das Akteurmodell kann eine Sperre und damit einen Deadlock verursachen. Seien Sie also beim Entwerfen des Programms vorsichtig. Das folgende Skript zeigt, wie Sie das Sperr- und Deadlock-Szenario simulieren können.
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking B, A started Waiting for locking to be done
From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.
# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End
When I run this script on my laptop I get:
Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished
And now, there is no longer a deadlock.
Einpacken
As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.
Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.
The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.
Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.
I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.