Introducción a la programación concurrente: una guía para principiantes
Publicado: 2022-03-11¿Qué es la programación concurrente? Descrito simplemente, es cuando estás haciendo más de una cosa al mismo tiempo. No debe confundirse con el paralelismo, la concurrencia es cuando se ejecutan varias secuencias de operaciones en períodos de tiempo superpuestos. En el ámbito de la programación, la concurrencia es un tema bastante complejo. Tratar con construcciones como subprocesos y bloqueos y evitar problemas como condiciones de carrera y puntos muertos puede ser bastante engorroso, lo que dificulta la escritura de programas concurrentes. A través de la concurrencia, los programas pueden diseñarse como procesos independientes que trabajan juntos en una composición específica. Tal estructura puede o no hacerse paralela; sin embargo, lograr una estructura de este tipo en su programa ofrece numerosas ventajas.
En este artículo, veremos varios modelos diferentes de concurrencia, cómo lograrlos en varios lenguajes de programación diseñados para la concurrencia.
Modelo de estado mutable compartido
Veamos un ejemplo simple con un contador y dos hilos que lo aumentan. El programa no debería ser demasiado complicado. Tenemos un objeto que contiene un contador que aumenta con el método de aumento y lo recupera con el método de obtención y dos hilos que lo aumentan.
// // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }Este programa ingenuo no es tan ingenuo como parece a primera vista. Cuando ejecuto este programa más veces, obtengo resultados diferentes. Hay tres valores después de tres ejecuciones en mi computadora portátil.
java Counting 553706 java Counting 547818 java Counting 613014¿Cuál es la razón de este comportamiento impredecible? El programa aumenta el contador en un lugar, en el método de aumento que utiliza el comando contador++. Si nos fijamos en el código de bytes del comando, veríamos que consta de varias partes:
- Leer el valor del contador de la memoria
- Aumente el valor localmente
- Almacenar el valor del contador en la memoria
Ahora podemos imaginar lo que puede salir mal en esta secuencia. Si tenemos dos subprocesos que aumentan el contador de forma independiente, entonces podríamos tener este escenario:
- El valor del contador es 115
- El primer hilo lee el valor del contador de la memoria (115)
- El primer subproceso aumenta el valor del contador local (116)
- El segundo hilo lee el valor del contador de la memoria (115)
- El segundo hilo aumenta el valor del contador local (116)
- El segundo hilo guarda el valor del contador local en la memoria (116)
- El primer hilo guarda el valor del contador local en la memoria (116)
- El valor del contador es 116
En este escenario, dos subprocesos se entrelazan para que el valor del contador aumente en 1, pero el valor del contador debe aumentarse en 2 porque cada subproceso lo aumenta en 1. El entrelazamiento de diferentes subprocesos influye en el resultado del programa. La razón de la imprevisibilidad del programa es que el programa no tiene control sobre el entrelazado de hilos sino sobre el sistema operativo. Cada vez que se ejecuta el programa, los hilos pueden entrelazarse de manera diferente. De esta forma, introdujimos la imprevisibilidad accidental (no determinismo) en el programa.
Para corregir esta imprevisibilidad accidental (no determinismo), el programa debe tener el control del entrelazado de hilos. Cuando un hilo está en el método aumentar otro hilo no debe estar en el mismo método hasta que salga el primero. De esa manera serializamos el acceso al aumento del método.
// // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }Otra solución es usar un contador que pueda aumentar atómicamente, lo que significa que la operación no se puede separar en múltiples operaciones. De esta forma, no necesitamos tener bloques de código que necesiten sincronizarse. Java tiene tipos de datos atómicos en el espacio de nombres java.util.concurrent.atomic, y usaremos AtomicInteger.
// // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }El entero atómico tiene las operaciones que necesitamos, por lo que podemos usarlo en lugar de la clase Counter. Es interesante notar que todos los métodos de atomicinteger no usan bloqueo, por lo que no hay posibilidad de puntos muertos, lo que facilita el diseño del programa.
El uso de palabras clave sincronizadas para sincronizar métodos críticos debería resolver todos los problemas, ¿verdad? Imaginemos que tenemos dos cuentas que pueden depositar, retirar y transferir a otra cuenta. ¿Qué pasa si al mismo tiempo queremos transferir dinero de una cuenta a otra y viceversa? Veamos un ejemplo.
// // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }Cuando ejecuto este programa en mi computadora portátil, generalmente se atasca. ¿Por qué pasó esto? Si miramos de cerca, podemos ver que cuando transferimos dinero estamos ingresando al método de transferencia que está sincronizado y bloquea el acceso a todos los métodos sincronizados en la cuenta de origen, y luego bloquea la cuenta de destino que bloquea el acceso a todos los métodos sincronizados en ella.
Imagina el siguiente escenario:
- Transferencia de llamadas de primer hilo en la cuenta de Bob a la cuenta de Joe
- Transferencia de llamadas de segundo hilo en la cuenta de Joe a la cuenta de Bob
- El segundo hilo disminuye la cantidad de la cuenta de Joe
- El segundo subproceso va a depositar el monto en la cuenta de Bob, pero espera a que el primer subproceso complete la transferencia.
- El primer hilo disminuye la cantidad de la cuenta de Bob
- El primer subproceso va a depositar el monto en la cuenta de Joe, pero espera a que el segundo subproceso complete la transferencia.
En este escenario, un subproceso está esperando que otro subproceso termine la transferencia y viceversa. Están atascados entre sí y el programa no puede continuar. Esto se llama interbloqueo. Para evitar el punto muerto, es necesario bloquear las cuentas en el mismo orden. Para arreglar el programa, le daremos a cada cuenta un número único para que podamos bloquear las cuentas en el mismo orden al transferir el dinero.
// // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }Debido a la imprevisibilidad de tales errores, a veces ocurren, pero no siempre y son difíciles de reproducir. Si el programa se comporta de manera impredecible, generalmente se debe a la concurrencia que introduce un no determinismo accidental. Para evitar el no determinismo accidental, debemos diseñar de antemano un programa que tenga en cuenta todos los entrelazamientos.
Un ejemplo de un programa que tiene un no determinismo accidental.
// // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }Este programa tiene un no determinismo accidental. Se mostrará el último valor ingresado en el contenedor.
java NonDeterminism SlowLos subprocesos más lentos ingresarán el valor más tarde, y este valor se imprimirá (Lento). Pero este no tiene por qué ser el caso. ¿Qué pasa si la computadora ejecuta simultáneamente otro programa que necesita muchos recursos de CPU? No tenemos ninguna garantía de que sea el subproceso más lento el que ingrese el valor en último lugar porque está controlado por el sistema operativo, no por el programa. Podemos tener situaciones en las que el programa funciona en un ordenador y en el otro se comporta de forma diferente. Dichos errores son difíciles de encontrar y causan dolores de cabeza a los desarrolladores. Por todas estas razones, este modelo de concurrencia es muy difícil de hacer bien.
Manera Funcional
Paralelismo
Veamos otro modelo que están usando los lenguajes funcionales. Por ejemplo, usaremos Clojure, que se puede interpretar con la herramienta Leiningen. Clojure es un lenguaje muy interesante con buen soporte para concurrencia. El modelo de concurrencia anterior era con estado mutable compartido. Las clases que usamos también pueden tener un estado oculto que muta y no lo conocemos, porque no es evidente en su API. Como hemos visto, este modelo puede causar indeterminismo accidental y puntos muertos si no tenemos cuidado. Los lenguajes funcionales tienen tipos de datos que no mutan, por lo que se pueden compartir de forma segura sin riesgo de que cambien. Las funciones tienen propiedades, así como otros tipos de datos. Las funciones se pueden crear durante la ejecución del programa y pasar como parámetro a otra función o regresar como resultado de la llamada a la función.
Las primitivas básicas para la programación concurrente son futuro y promesa. Future ejecuta un bloque de código en otro subproceso y devuelve un objeto para el valor futuro que se ingresará cuando se ejecute el bloque.
; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))Cuando ejecuto este script, el resultado es:
Started A Started B Waiting for futures Finished A Finished B 10En este ejemplo tenemos dos bloques futuros que se ejecutan de forma independiente. El programa solo bloquea al leer el valor del objeto futuro que aún no está disponible. En nuestro caso, a la espera de que se sumen ambos resultados de futuros bloques. El comportamiento es predecible (determinista) y siempre dará el mismo resultado porque no hay un estado mutable compartido.
Otra primitiva que se usa para la concurrencia es una promesa. Promise es un contenedor en el que se puede poner un valor una vez. Al leer promesas, el hilo esperará hasta que se llene el valor de la promesa.
; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)En este ejemplo, el futuro esperará para imprimir el resultado siempre que la promesa no se guarde. Después de dos segundos, en la promesa se almacenará el valor 42 para imprimirse en el hilo futuro. El uso de promesas puede llevar a un punto muerto en lugar del futuro, así que tenga cuidado al usar promesas.
; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)En este ejemplo, estamos usando el resultado del futuro y el resultado de la promesa. El orden de configuración y lectura de valores es que el subproceso principal está esperando un valor del subproceso futuro y el subproceso futuro está esperando un valor del subproceso principal. Este comportamiento será predecible (determinista) y se reproducirá cada vez que se ejecute el programa, lo que facilita la búsqueda y eliminación de errores.
Usar el futuro permite que el programa continúe con el ejercicio hasta que necesite el resultado de la ejecución del futuro. Esto da como resultado una ejecución más rápida del programa. Si tiene varios procesadores con el futuro, puede realizar la ejecución paralela del programa que tiene un comportamiento predecible (determinista) (cada vez da el mismo resultado). Así aprovechamos mejor el poder de la computadora.
; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))En este ejemplo puedes ver cómo el uso del futuro puede hacer un mejor uso de la velocidad de una computadora. Tenemos dos números de Fibonacci que se suman. Podemos ver que el programa calcula el resultado dos veces, la primera vez secuencialmente en un solo hilo y la segunda vez en paralelo en dos hilos. Como mi portátil tiene un procesador multinúcleo, la ejecución en paralelo funciona el doble de rápido que el cálculo secuencial.
El resultado de ejecutar este script en mi computadora portátil:
Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"concurrencia
Para admitir la concurrencia y la imprevisibilidad en el lenguaje de programación Clojure, debemos usar un tipo de datos que sea variable para que otros subprocesos puedan ver los cambios. El tipo de dato variable más simple es átomo. Atom es un contenedor que siempre tiene el valor que puede ser reemplazado por otro valor. El valor se puede reemplazar ingresando un valor nuevo o llamando a una función que toma el valor anterior y devuelve un valor nuevo que se usa con más frecuencia. Es interesante que atom se implemente sin bloqueo y sea seguro de usar en subprocesos, lo que significa que es imposible llegar a un punto muerto. Internamente, atom usa la biblioteca java.util.concurrent.AtomicReference. Veamos un contraejemplo implementado con atom.
; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)El resultado de la ejecución del script en mi computadora portátil:
The counter is: 1000000 Number of attempts: 1680212En este ejemplo usamos un átomo que contiene el valor del contador. El contador aumenta con (swap! counter inc). La función de intercambio funciona así: 1. toma el valor del contador y lo conserva 2. para este valor llama a la función dada que calcula el nuevo valor 3. para guardar el nuevo valor, usa una operación atómica que verifica si el valor anterior ha cambiado 3a. si el valor no ha cambiado, ingresa un nuevo valor 3b. si el valor se cambia mientras tanto, vaya al paso 1 Vemos que la función se puede llamar de nuevo si el valor se cambia mientras tanto. El valor solo se puede cambiar desde otro hilo. Por lo tanto, es esencial que la función que calcula un nuevo valor no tenga efectos secundarios para que no importe si se llama más veces. Una limitación de atom es que sincroniza los cambios a un valor.
; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)Cuando ejecuto este script me sale:

Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525En este ejemplo podemos ver como cambiamos más átomos. En un punto, la inconsistencia puede ocurrir. La suma de dos cuentas en algún momento no es lo mismo. Si tenemos que coordinar cambios de múltiples valores hay dos soluciones:
- Coloque más valores en un átomo
- Utilizar referencias y memoria transaccional de software, como veremos más adelante
; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)Cuando ejecuto este script en mi computadora, obtengo:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0En el ejemplo se ha resuelto la coordinación para que le demos más valor usando un mapa. Cuando transferimos dinero de la cuenta, cambiamos todas las cuentas en ese momento para que nunca suceda que la suma de dinero no sea la misma.
El siguiente tipo de dato variable es agente. El agente se comporta como un átomo solo en el sentido de que la función que cambia el valor se ejecuta en un subproceso diferente, por lo que el cambio tarda un tiempo en hacerse visible. Por lo tanto, al leer el valor del agente es necesario llamar a una función que espera hasta que se ejecuten todas las funciones que cambian el valor del agente. A diferencia de los átomos, la función que cambia el valor se llama solo una vez y, por lo tanto, puede tener efectos secundarios. Este tipo también puede sincronizar un valor y no puede interbloquearse.
; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)Cuando ejecuto este script en mi computadora portátil, obtengo:
The counter is: 1000000 Number of attempts: 1000000Este ejemplo es el mismo que la implementación del contador con el átomo. La única diferencia es que aquí estamos esperando que se completen todos los cambios del agente antes de leer el valor final usando await.
El último tipo de datos variables son las referencias. A diferencia de los átomos, las referencias pueden sincronizar cambios en múltiples valores. Cada operación en referencia debe estar en una transacción usando dosincronización. Esta forma de cambiar los datos se denomina memoria transaccional de software o STM abreviado. Veamos un ejemplo con la transferencia de dinero en las cuentas.
; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)Cuando ejecuto este script, obtengo:
Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000Curiosamente, hubo más intentos que el número de transacciones realizadas. Esto se debe a que STM no usa bloqueos, por lo que si hay un conflicto (como dos subprocesos que intentan cambiar el mismo valor), la transacción se volverá a ejecutar. Por esta razón, la transacción no debería tener efectos secundarios. Podemos ver que el agente cuyo valor cambia dentro de la transacción se comporta de manera predecible. Una función que cambie el valor del agente será evaluada tantas veces como transacciones haya. La razón es que el agente es consciente de la transacción. Si la transacción debe tener efectos secundarios, deben ponerse en funcionamiento dentro del agente. De esta forma, el programa tendrá un comportamiento predecible. Probablemente piense que siempre debe usar STM, pero los programadores experimentados a menudo usarán átomos porque los átomos son más simples y rápidos que STM. Por supuesto, eso es si es posible hacer un programa de esa manera. Si tiene efectos secundarios, entonces no hay otra opción que usar STM y agentes.
Modelo actor
El siguiente modelo de concurrencia es un modelo actor. El principio de este modelo es similar al del mundo real. Si hacemos un trato para crear algo con muchas personas, por ejemplo, un edificio, entonces cada hombre en el sitio de construcción tiene su propio papel. Una multitud de personas es supervisada por el supervisor. Si un trabajador se lesiona en el trabajo, el supervisor asignará el trabajo del lesionado a los demás que estén disponibles. Si es necesario, puede llevar al sitio a un hombre nuevo. En el sitio tenemos más personas que hacen el trabajo simultáneamente (concurrentemente), pero también hablando entre sí para sincronizar. Si ponemos el trabajo en el sitio de construcción en el programa, entonces cada persona sería un actor que tiene un estado y ejecuta en su propio proceso, y la conversación sería reemplazada por mensajes. El lenguaje de programación popular basado en este modelo es Erlang. Este interesante lenguaje tiene tipos de datos inmutables y funciones que tienen las mismas propiedades que otros tipos de datos. Las funciones pueden crearse durante la ejecución del programa y pasarse como argumentos a otra función o devolverse como resultado de la llamada a la función. Daré ejemplos en el lenguaje Elixir que usa la máquina virtual Erlang, por lo que tendré el mismo modelo de programación que Erlang, solo una sintaxis diferente. Las tres primitivas más importantes en Elixir son generar, enviar y recibir. spawn ejecuta la función en el nuevo proceso, send envía el mensaje al proceso y receive recibe los mensajes que se envían al proceso actual.
El primer ejemplo con el modelo de actor se incrementará al mismo tiempo. Para hacer un programa con este modelo, es necesario hacer que un actor tenga el valor del contador y reciba un mensaje para establecer y recuperar el valor del contador, y tener dos actores que aumentarán simultáneamente el valor del contador.
# # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" endCuando ejecuto este ejemplo me sale:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827Podemos ver que al final el contador es 516827 y no 1000000 como esperábamos. La próxima vez que ejecuté el script, recibí 511010. El motivo de este comportamiento es que el contador recibe dos mensajes: recuperar el valor actual y establecer el nuevo valor. Para aumentar el contador, el programa necesita obtener el valor actual, aumentarlo en 1 y establecer el valor aumentado. Dos procesos leen y escriben el valor del contador al mismo tiempo mediante el uso de mensajes que se envían al proceso de contador. El orden de los mensajes que recibirá el contador es impredecible y el programa no puede controlarlo. Podemos imaginar este escenario:
- El valor del contador es 115
- El proceso A lee el valor del contador (115)
- El proceso B lee el valor del contador (115)
- El proceso B aumenta el valor localmente (116)
- El proceso B establece un valor aumentado en el contador (116)
- El proceso A aumenta el valor del contador (116)
- El proceso A establece un valor aumentado en el contador (116)
- El valor del contador es 116
Si observamos el escenario, dos procesos aumentan el contador en 1, y el contador aumenta al final en 1 y no en 2. Dichos entrelazamientos pueden ocurrir un número impredecible de veces y, por lo tanto, el valor del contador es impredecible. Para evitar este comportamiento, la operación de aumento debe realizarse mediante un mensaje.
# # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" endAl ejecutar este script obtengo:
Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000Podemos ver que el contador tiene el valor correcto. El motivo del comportamiento predecible (determinista) es que el valor del contador aumenta en un mensaje, de modo que la secuencia de mensajes para aumentar el contador no afectará su valor final. Al trabajar con el modelo de actor, debemos prestar atención a cómo los mensajes pueden entrelazarse y un diseño cuidadoso de los mensajes y las acciones en los mensajes para evitar la imprevisibilidad accidental (no determinismo).
¿Cómo podemos transferir dinero entre dos cuentas con este modelo?
# # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" endCuando ejecuto este script me sale:
Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0Podemos ver que la transferencia de dinero funciona sin inconsistencias, porque hemos elegido la transferencia de mensajes para transferir dinero y los montos de mensajes para obtener el valor de las cuentas, lo que nos brinda un comportamiento predecible del programa. Siempre que hagamos una transferencia de dinero, la cantidad total de dinero en cada momento debe ser la misma.
El modelo de actor puede provocar un bloqueo y, por lo tanto, un punto muerto, así que tenga cuidado al diseñar el programa. El siguiente script muestra cómo puede simular el escenario de bloqueo e interbloqueo.
# # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil EndWhen I run this script on my laptop I get:
Locking A, B started Locking B, A started Waiting for locking to be doneFrom the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.
# # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil EndWhen I run this script on my laptop I get:
Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finishedAnd now, there is no longer a deadlock.
Envolver
As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.
Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.
The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.
Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.
I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.
