Introdução à programação concorrente: um guia para iniciantes

Publicados: 2022-03-11

O que é programação concorrente? Simplesmente descrito, é quando você está fazendo mais de uma coisa ao mesmo tempo. Não deve ser confundido com paralelismo, simultaneidade é quando várias sequências de operações são executadas em períodos de tempo sobrepostos. No domínio da programação, a simultaneidade é um assunto bastante complexo. Lidar com construções como threads e bloqueios e evitar problemas como condições de corrida e impasses pode ser bastante complicado, dificultando a escrita de programas simultâneos. Por meio da simultaneidade, os programas podem ser projetados como processos independentes trabalhando juntos em uma composição específica. Tal estrutura pode ou não ser paralela; no entanto, alcançar essa estrutura em seu programa oferece inúmeras vantagens.

Introdução à programação concorrente

Neste artigo, veremos vários modelos diferentes de simultaneidade, como alcançá-los em várias linguagens de programação projetadas para simultaneidade.

Modelo de estado mutável compartilhado

Vejamos um exemplo simples com um contador e duas threads que o aumentam. O programa não deve ser muito complicado. Temos um objeto que contém um contador que aumenta com o aumento do método, e o recupera com o método get e duas threads que o aumentam.

 // // Counting.java // public class Counting { public static void main(String[] args) throws InterruptedException { class Counter { int counter = 0; public void increment() { counter++; } public int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int x = 0; x < 500000; x++) { counter.increment(); } } } CountingThread t1 = new CountingThread(); CountingThread t2 = new CountingThread(); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(counter.get()); } }

Este programa ingênuo não é tão ingênuo quanto parece à primeira vista. Quando executo este programa mais vezes, obtenho resultados diferentes. Existem três valores após três execuções no meu laptop.

 java Counting 553706 java Counting 547818 java Counting 613014

Qual é a razão para este comportamento imprevisível? O programa aumenta o contador em um só lugar, no método raise que utiliza o comando counter++. Se olharmos para o código de byte de comando, veremos que ele consiste em várias partes:

  1. Ler o valor do contador da memória
  2. Aumente o valor localmente
  3. Armazenar o valor do contador na memória

Agora podemos imaginar o que pode dar errado nessa sequência. Se tivermos dois threads que aumentam independentemente o contador, poderíamos ter este cenário:

  1. O valor do contador é 115
  2. A primeira thread lê o valor do contador da memória (115)
  3. O primeiro encadeamento aumenta o valor do contador local (116)
  4. A segunda thread lê o valor do contador da memória (115)
  5. O segundo encadeamento aumenta o valor do contador local (116)
  6. A segunda thread salva o valor do contador local na memória (116)
  7. A primeira thread salva o valor do contador local na memória (116)
  8. O valor do contador é 116

Nesse cenário, dois threads são entrelaçados de forma que o valor do contador seja aumentado em 1, mas o valor do contador deve ser aumentado em 2, pois cada thread o aumenta em 1. O entrelaçamento de threads diferentes influencia o resultado do programa. A razão da imprevisibilidade do programa é que o programa não tem controle do entrelaçamento de threads, mas do sistema operacional. Cada vez que o programa é executado, as threads podem se entrelaçar de forma diferente. Desta forma, introduzimos imprevisibilidade acidental (não-determinismo) ao programa.

Para corrigir essa imprevisibilidade acidental (não-determinismo), o programa deve ter controle do entrelaçamento de threads. Quando um thread está no método aumentar outro thread não deve estar no mesmo método até que o primeiro saia dele. Dessa forma, serializamos o acesso ao aumento do método.

 // // CountingFixed.java // public class CountingFixed { public static main(String[] args) throws InterruptedException { class Counter { int counter = 0; public synchronized void increase() { counter++; } public synchronized int get() { return counter; } } final Counter counter = new Counter(); class CountingThread extends Thread { public void run() { for (int i = 0; i < 500000; i++) { counter.increment(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CountingThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

Outra solução é usar um contador que pode aumentar atomicamente, o que significa que a operação não pode ser separada em várias operações. Dessa forma, não precisamos ter blocos de código que precisam ser sincronizados. Java tem tipos de dados atômicos no namespace java.util.concurrent.atomic e usaremos AtomicInteger.

 // // CountingBetter.java // import java.util.concurrent.atomic.AtomicInteger; class CountingBetter { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class CountingThread extends Thread { public viod run() { for (int i = 0; i < 500000; i++) { counter.incrementAndGet(); } } } CountingThread thread1 = new CountingThread(); CountingThread thread2 = new CoutningThread(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(counter.get()); } }

O inteiro atômico tem as operações que precisamos, então podemos usá-lo em vez da classe Counter. É interessante notar que todos os métodos de atomicinteger não utilizam bloqueio, de modo que não há possibilidade de deadlocks, o que facilita o projeto do programa.

Usar palavras-chave sincronizadas para sincronizar métodos críticos deve resolver todos os problemas, certo? Vamos imaginar que temos duas contas que podem depositar, sacar e transferir para outra conta. O que acontece se ao mesmo tempo quisermos transferir dinheiro de uma conta para outra e vice-versa? Vejamos um exemplo.

 // // Deadlock.java // public class Deadlock { public static void main(String[] args) throws InterruptedException { class Account { int balance = 100; public Account(int balance) { this.balance = balance; } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized boolean transfer(Account destination, int amount) { if (balance >= amount) { balance -= amount; synchronized(destination) { destination.balance += amount; }; return true; } return false; } public int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Quando executo este programa no meu laptop, geralmente fica travado. Por que isso acontece? Se olharmos de perto, podemos ver que quando transferimos dinheiro estamos entrando no método de transferência que é sincronizado e bloqueia o acesso a todos os métodos sincronizados na conta de origem e, em seguida, bloqueia a conta de destino que bloqueia o acesso a todos os métodos sincronizados.

Imagine o seguinte cenário:

  1. O primeiro encadeamento de chamadas é transferido da conta de Bob para a conta de Joe
  2. O segundo encadeamento chama a transferência da conta de Joe para a conta de Bob
  3. O segundo segmento diminui o valor da conta de Joe
  4. O segundo thread vai para depositar o valor na conta de Bob, mas aguarda o primeiro thread concluir a transferência.
  5. O primeiro encadeamento diminui o valor da conta de Bob
  6. O primeiro segmento vai depositar o valor na conta de Joe, mas aguarda o segundo segmento concluir a transferência.

Nesse cenário, um encadeamento está aguardando outro encadeamento concluir a transferência e vice-versa. Eles estão presos um ao outro e o programa não pode continuar. Isso é chamado de impasse. Para evitar o impasse é necessário bloquear as contas na mesma ordem. Para corrigir o programa, daremos a cada conta um número único para que possamos bloquear as contas na mesma ordem ao transferir o dinheiro.

 // // DeadlockFixed.java // import java.util.concurrent.atomic.AtomicInteger; public class DeadlockFixed { public static void main(String[] args) throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); class Account { int balance = 100; int order; public Account(int balance) { this.balance = balance; this.order = counter.getAndIncrement(); } public synchronized void deposit(int amount) { balance += amount; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public boolean transfer(Account destination, int amount) { Account first; Account second; if (this.order < destination.order) { first = this; second = destination; } else { first = destination; second = this; } synchronized(first) { synchronized(second) { if (balance >= amount) { balance -= amount; destination.balance += amount; return true; } return false; } } } public synchronized int getBalance() { return balance; } } final Account bob = new Account(200000); final Account joe = new Account(300000); class FirstTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { bob.transfer(joe, 2); } } } class SecondTransfer extends Thread { public void run() { for (int i = 0; i < 100000; i++) { joe.transfer(bob, 1); } } } FirstTransfer thread1 = new FirstTransfer(); SecondTransfer thread2 = new SecondTransfer(); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println("Bob's balance: " + bob.getBalance()); System.out.println("Joe's balance: " + joe.getBalance()); } }

Devido à imprevisibilidade de tais erros, eles às vezes acontecem, mas nem sempre e são difíceis de reproduzir. Se o programa se comporta de forma imprevisível, geralmente é causado por simultaneidade que introduz não determinismo acidental. Para evitar o não-determinismo acidental, devemos projetar com antecedência o programa para levar em conta todos os entrelaçamentos.

Um exemplo de um programa que tem um não-determinismo acidental.

 // // NonDeteminism.java // public class NonDeterminism { public static void main(String[] args) throws InterruptedException { class Container { public String value = "Empty"; } final Container container = new Container(); class FastThread extends Thread { public void run() { container.value = "Fast"; } } class SlowThread extends Thread { public void run() { try { Thread.sleep(50); } catch(Exception e) {} container.value = "Slow"; } } FastThread fast = new FastThread(); SlowThread slow = new SlowThread(); fast.start(); slow.start(); fast.join(); slow.join(); System.out.println(container.value); } }

Este programa contém um não-determinismo acidental. O último valor inserido no contêiner será exibido.

 java NonDeterminism Slow

As roscas mais lentas inserirão o valor posteriormente e esse valor será impresso (Lento). Mas este não precisa ser o caso. E se o computador executar simultaneamente outro programa que precise de muitos recursos da CPU? Não temos garantia de que será o thread mais lento que inserirá o valor por último porque é controlado pelo sistema operacional, não pelo programa. Podemos ter situações em que o programa funciona em um computador e no outro se comporta de forma diferente. Tais erros são difíceis de encontrar e causam dores de cabeça para os desenvolvedores. Por todas essas razões, esse modelo de simultaneidade é muito difícil de ser feito corretamente.

Maneira Funcional

Paralelismo

Vejamos outro modelo que as linguagens funcionais estão usando. Por exemplo, usaremos Clojure, que pode ser interpretado usando a ferramenta Leiningen. Clojure é uma linguagem muito interessante com bom suporte para simultaneidade. O modelo de simultaneidade anterior era com estado mutável compartilhado. As classes que usamos também podem ter um estado oculto que sofre mutações que não conhecemos, porque não é evidente em sua API. Como vimos, esse modelo pode causar não-determinismo acidental e impasses se não formos cuidadosos. As linguagens funcionais têm tipos de dados que não sofrem mutação para que possam ser compartilhados com segurança sem o risco de serem alterados. As funções têm propriedades, bem como outros tipos de dados. As funções podem ser criadas durante a execução do programa e passadas como parâmetro para outra função ou retornar como resultado da chamada da função.

Primitivas básicas para programação concorrente são futuras e promissoras. Future executa um bloco de código em outra thread e retorna um objeto para o valor futuro que será inserido quando o bloco for executado.

 ; ; future.clj ; (let [a (future (println "Started A") (Thread/sleep 1000) (println "Finished A") (+ 1 2)) b (future (println "Started B") (Thread/sleep 2000) (println "Finished B") (+ 3 4))] (println "Waiting for futures") (+ @a @b))

Quando executo este script a saída é:

 Started A Started B Waiting for futures Finished A Finished B 10

Neste exemplo temos dois blocos futuros que são executados independentemente. Programa apenas bloqueia ao ler o valor do objeto futuro que ainda não está disponível. No nosso caso, aguardando ambos os resultados dos blocos futuros para serem somados. O comportamento é previsível (determinístico) e sempre dará o mesmo resultado porque não há estado mutável compartilhado.

Outra primitiva que é usada para simultaneidade é uma promessa. Promise é um recipiente no qual se pode colocar um valor uma vez. Ao ler as promessas, o encadeamento aguardará até que o valor da promessa seja preenchido.

 ; ; promise.clj ; (def result (promise)) (future (println "The result is: " @result)) (Thread/sleep 2000) (deliver result 42)

Neste exemplo, o futuro aguardará para imprimir o resultado enquanto a promessa não ser salva. Após dois segundos, na promessa será armazenado o valor 42 para ser impresso na thread futura. O uso de promessas pode levar a um impasse em oposição ao futuro, portanto, tenha cuidado ao usar promessa.

 ; ; promise-deadlock.clj ; (def promise-result (promise)) (def future-result (future (println "The result is: " + @promise-result) 13)) (println "Future result is: " @future-result) (deliver result 42)

Neste exemplo, estamos usando o resultado do futuro e o resultado da promessa. A ordem de configuração e leitura de valores é que o encadeamento principal está aguardando um valor do encadeamento futuro e o encadeamento futuro está aguardando um valor do encadeamento principal. Esse comportamento será previsível (determinístico) e será reproduzido toda vez que o programa for executado, o que facilita a localização e remoção de erros.

Usar o futuro permite que o programa continue com o exercício até precisar do resultado da execução do futuro. Isso resulta em uma execução mais rápida do programa. Se você tiver vários processadores com o futuro, você pode fazer a execução paralela de programas que tenham comportamento previsível (determinístico) (cada vez dá o mesmo resultado). Dessa forma, exploramos melhor o poder do computador.

 ; ; fibonacci.clj ; (defn fibonacci[a] (if (<= a 2) 1 (+ (fibonacci (- a 1)) (fibonacci (- a 2))))) (println "Start serial calculation") (time (println "The result is: " (+ (fibonacci 36) (fibonacci 36)))) (println "Start parallel calculation") (defn parallel-fibonacci[] (def result-1 (future (fibonacci 36))) (def result-2 (future (fibonacci 36))) (+ @result-1 @result-2)) (time (println "The result is: " (parallel-fibonacci)))

Neste exemplo você pode ver como o uso de future pode fazer melhor uso da velocidade de um computador. Temos dois números de Fibonacci que se somam. Podemos ver que o programa calcula o resultado duas vezes, a primeira vez sequencialmente em uma única thread e a segunda vez em paralelo em duas threads. Como meu laptop tem um processador multicore, a execução paralela funciona duas vezes mais rápido que o cálculo sequencial.

O resultado da execução deste script no meu laptop:

 Start serial calculation The result is: 29860704 "Elapsed time: 2568.816524 msecs" Start parallel calculation The result is: 29860704 "Elapsed time: 1216.991448 msecs"

Simultaneidade

Para dar suporte à simultaneidade e imprevisibilidade na linguagem de programação Clojure, devemos usar um tipo de dados variável para que outros threads possam ver as alterações. O tipo de dados variável mais simples é o atom. Atom é um container que sempre tem o valor que pode ser substituído por outro valor. O valor pode ser substituído inserindo um novo valor ou chamando uma função que recebe o valor antigo e retorna um novo valor que é usado com mais frequência. É interessante que o atom seja implementado sem travamento e seja seguro para uso em threads, o que significa que é impossível atingir o deadlock. Internamente, o atom usa a biblioteca java.util.concurrent.AtomicReference. Vejamos um contra-exemplo implementado com o atom.

 ; ; atom-counter.clj ; (def counter (atom 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (swap! counter (fn [counter] (swap! attempts inc) ; side effect DO NOT DO THIS (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; Wait for futures to complete @first-future @second-future ; Print value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

O resultado da execução do script no meu laptop:

 The counter is: 1000000 Number of attempts: 1680212

Neste exemplo usamos um átomo que contém o valor do contador. O contador aumenta com (troca! contador inc). A função de troca funciona assim: 1. pega o valor do contador e preserva-o 2. para este valor chama determinada função que calcula o novo valor 3. para salvar o novo valor, usa operação atômica que verifica se o valor antigo foi alterado 3a. se o valor não foi alterado, ele insere um novo valor 3b. se o valor for alterado nesse meio tempo, vá para a etapa 1. Vemos que a função pode ser chamada novamente se o valor for alterado nesse meio tempo. O valor só pode ser alterado a partir de outro thread. Portanto, é essencial que a função que calcula um novo valor não tenha efeitos colaterais para que não importe se for chamada mais vezes. Uma limitação do átomo é que ele sincroniza as alterações em um valor.

 ; ; atom-acocunts.clj ; (def bob (atom 200000)) (def joe (atom 300000)) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (if (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) (swap! source - amount) (swap! destination + amount)) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies)

Quando executo este script recebo:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 36525

Neste exemplo podemos ver como mudamos mais átomos. Em um ponto, a inconsistência pode acontecer. A soma de duas contas em algum momento não é a mesma. Se tivermos que coordenar mudanças de vários valores, existem duas soluções:

  1. Coloque mais valores em um átomo
  2. Use referências e memória transacional de software, como veremos mais adiante
 ; ; atom-accounts-fixed.clj ; (def accounts (atom {:bob 200000, :joe 300000})) (def inconsistencies (atom 0)) (defn transfer [source destination amount] (let [deref-accounts @accounts] (if (not= (+ (get deref-accounts :bob) (get deref-accounts :joe)) 500000) (swap! inconsistencies inc)) (swap! accounts (fn [accs] (update (update accs source - amount) destination + amount))))) (defn first-transfer [] (dotimes [cnt 100000] (transfer :bob :joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer :joe :bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (println "Bob has in account: " (get @accounts :bob)) (println "Joe has in account: " (get @accounts :joe)) (println "Inconsistencies while transfer: " @inconsistencies)

Quando executo este script no meu computador, recebo:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

No exemplo, a coordenação foi resolvida para que possamos dar mais valor ao uso de um mapa. Quando transferimos dinheiro da conta, alteramos todas as contas no momento para que nunca aconteça que a soma de dinheiro não seja a mesma.

O próximo tipo de dados de variável é agente. O agente se comporta como um átomo apenas porque a função que altera o valor é executada em um thread diferente, de modo que leva algum tempo para que a alteração se torne visível. Portanto, ao ler o valor do agente é necessário chamar uma função que espera até que todas as funções que alteram o valor do agente sejam executadas. Ao contrário da função de átomos que altera o valor é chamado apenas uma vez e, portanto, pode ter efeitos colaterais. Esse tipo também pode sincronizar um valor e não pode travar.

 ; ; agent-counter.clj ; (def counter (agent 0)) (def attempts (atom 0)) (defn counter-increases[] (dotimes [cnt 500000] (send counter (fn [counter] (swap! attempts inc) (inc counter))))) (def first-future (future (counter-increases))) (def second-future (future (counter-increases))) ; wait for futures to complete @first-future @second-future ; wait for counter to be finished with updating (await counter) ; print the value of the counter (println "The counter is: " @counter) (println "Number of attempts: " @attempts)

Quando executo este script no meu laptop, recebo:

 The counter is: 1000000 Number of attempts: 1000000

Este exemplo é o mesmo que a implementação do contador com o átomo. A única diferença é que aqui estamos esperando que todas as alterações do agente sejam concluídas antes de ler o valor final usando await.

O último tipo de dados de variável são referências. Ao contrário dos átomos, as referências podem sincronizar alterações em vários valores. Cada operação em referência deve estar em uma transação usando dosync. Essa maneira de alterar os dados é chamada de memória transacional de software ou STM abreviada. Vejamos um exemplo com a transferência de dinheiro nas contas.

 ; ; stm-accounts.clj ; (def bob (ref 200000)) (def joe (ref 300000)) (def inconsistencies (atom 0)) (def attempts (atom 0)) (def transfers (agent 0)) (defn transfer [source destination amount] (dosync (swap! attempts inc) ; side effect DO NOT DO THIS (send transfers inc) (when (not= (+ @bob @joe) 500000) (swap! inconsistencies inc)) ; side effect DO NOT DO THIS (alter source - amount) (alter destination + amount))) (defn first-transfer [] (dotimes [cnt 100000] (transfer bob joe 2))) (defn second-transfer [] (dotimes [cnt 100000] (transfer joe bob 1))) (def first-future (future (first-transfer))) (def second-future (future (second-transfer))) @first-future @second-future (await transfers) (println "Bob has in account: " @bob) (println "Joe has in account: " @joe) (println "Inconsistencies while transfer: " @inconsistencies) (println "Attempts: " @attempts) (println "Transfers: " @transfers)

Quando executo este script, recebo:

 Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0 Attempts: 330841 Transfers: 200000

Curiosamente, houve mais tentativas do que o número de transações realizadas. Isso ocorre porque o STM não usa bloqueios, portanto, se houver um conflito (como duas threads tentando alterar o mesmo valor), a transação será executada novamente. Por esse motivo, a transação não deve ter efeitos colaterais. Podemos ver que o agente cujo valor muda dentro da transação se comporta de maneira previsível. Uma função que altera o valor do agente será avaliada quantas vezes houver transações. A razão é que o agente está ciente da transação. Se a transação deve ter efeitos colaterais, eles devem ser colocados em funcionamento dentro do agente. Desta forma, o programa terá um comportamento previsível. Você provavelmente pensa que deve sempre usar o STM, mas programadores experientes costumam usar átomos porque os átomos são mais simples e rápidos que o STM. Claro, isso se for possível fazer um programa dessa maneira. Se você tiver efeitos colaterais, não há outra opção a não ser usar STM e agentes.

Modelo de ator

O seguinte modelo de simultaneidade é um modelo de ator. O princípio deste modelo é semelhante ao do mundo real. Se fizermos um acordo para criar algo com muitas pessoas, por exemplo, um prédio, então cada homem no canteiro de obras tem seu próprio papel. Uma multidão de pessoas é supervisionada pelo supervisor. Se um trabalhador se ferir no trabalho, o supervisor atribuirá o trabalho do ferido aos demais que estiverem disponíveis. Se necessário, ele pode levar ao local um novo homem. No site temos mais pessoas que fazem o trabalho simultaneamente (simultaneamente), mas também conversando entre si para sincronizar. Se colocarmos o trabalho no canteiro de obras no programa, cada pessoa seria um ator que tem um estado e executa em seu próprio processo, e a fala seria substituída por mensagens. A linguagem de programação popular baseada neste modelo é Erlang. Essa linguagem interessante possui tipos de dados imutáveis ​​e funções que possuem as mesmas propriedades de outros tipos de dados. As funções podem ser criadas durante a execução do programa e passadas como argumentos para outra função ou retornadas como resultado de uma chamada de função. Vou dar exemplos na linguagem Elixir que usa a máquina virtual Erlang, então terei o mesmo modelo de programação do Erlang apenas com sintaxe diferente. As três primitivas mais importantes no Elixir são spawn, send e receive. spawn executa a função no novo processo, send envia a mensagem para o processo e recebe recebe mensagens que são enviadas para o processo atual.

O primeiro exemplo com o modelo de ator será incrementado simultaneamente. Para fazer um programa com este modelo, é necessário fazer um ator ter o valor do contador e receber mensagem para definir e recuperar o valor do contador, e ter dois atores que irão simultaneamente aumentar o valor do contador.

 # # Counting.exs # defmodule Counting do def counter(value) do receive do {:get, sender} -> send sender, {:counter, value} counter value {:set, new_value} -> counter(new_value) end end def counting(sender, counter, times) do if times > 0 do send counter, {:get, self} receive do {:counter, value} -> send counter, {:set, value + 1} end counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Quando executo este exemplo recebo:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 516827

Podemos ver que no final o contador é 516827 e não 1000000 como esperávamos. Quando executei o script da próxima vez, recebi 511010. O motivo desse comportamento é que o contador recebe duas mensagens: recuperar o valor atual e definir o novo valor. Para aumentar o contador, o programa precisa obter o valor atual, aumentá-lo em 1 e definir o valor aumentado. Dois processos lêem e escrevem o valor do contador ao mesmo tempo usando mensagens que são enviadas ao processo contador. A ordem das mensagens que o contador receberá é imprevisível e o programa não pode controlá-la. Podemos imaginar este cenário:

  1. O valor do contador é 115
  2. O processo A lê o valor do contador (115)
  3. O processo B lê o valor do contador (115)
  4. O processo B aumenta o valor localmente (116)
  5. O processo B define o valor aumentado para o contador (116)
  6. O processo A aumenta o valor do contador (116)
  7. O processo A define o valor aumentado para o contador (116)
  8. O valor do contador é 116

Se olharmos para o cenário, dois processos aumentam o contador em 1, e o contador é aumentado no final em 1 e não em 2. Esses entrelaçamentos podem acontecer um número imprevisível de vezes e, portanto, o valor do contador é imprevisível. Para evitar esse comportamento, a operação de aumento deve ser feita por uma mensagem.

 # # CountingFixed.exs # defmodule Counting do def counter(value) do receive do :increase -> counter(value + 1) {:get, sender} -> send sender, {:counter, value} counter value end end def counting(sender, counter, times) do if times > 0 do send counter, :increase counting(sender, counter, times - 1) else send sender, {:done, self} end end end counter = spawn fn -> Counting.counter 0 end IO.puts "Starting counting processes" this = self counting1 = spawn fn -> IO.puts "Counting A started" Counting.counting this, counter, 500_000 IO.puts "Counting A finished" end counting2 = spawn fn -> IO.puts "Counting B started" Counting.counting this, counter, 500_000 IO.puts "Counting B finished" end IO.puts "Waiting for counting to be done" receive do {:done, ^counting1} -> nil end receive do {:done, ^counting2} -> nil end send counter, {:get, self} receive do {:counter, value} -> IO.puts "Counter is: #{value}" end

Ao executar este script, recebo:

 Starting counting processes Counting A started Waiting for counting to be done Counting B started Counting A finished Counting B finished Counter is: 1000000

Podemos ver que o contador tem o valor correto. A razão para o comportamento previsível (determinístico) é que o valor do contador aumenta em uma mensagem para que a sequência de mensagens para aumentar o contador não afete seu valor final. Trabalhando com modelo de ator, temos que prestar atenção em como as mensagens podem se entrelaçar e um design cuidadoso das mensagens e ações nas mensagens para evitar imprevisibilidades acidentais (não determinismo).

Como podemos transferir dinheiro entre duas contas com este modelo?

 # # Accounts.exs # defmodule Accounts do def accounts(state) do receive do {:transfer, source, destination, amount} -> accounts %{state | source => state[source] - amount , destination => state[destination] + amount} {:amounts, accounts, sender } -> send sender, {:amounts, for account <- accounts do {account, state[account]} end} accounts(state) end end def transfer(sender, accounts, source, destination, amount, times, inconsistencies) do if times > 0 do send accounts, {:amounts, [source, destination], self} receive do {:amounts, amounts} -> if amounts[source] + amounts[destination] != 500_000 do Agent.update(inconsistencies, fn value -> value + 1 end) end end send accounts, {:transfer, source, destination, amount} transfer(sender, accounts, source, destination, amount, times - 1, inconsistencies) else send sender, {:done, self} end end end accounts = spawn fn -> Accounts.accounts(%{bob: 200_000, joe: 300_000 }) end {:ok, inconsistencies} = Agent.start(fn -> 0 end) this = self transfer1 = spawn fn -> IO.puts "Transfer A started" Accounts.transfer(this, accounts, :bob, :joe, 2, 100_000, inconsistencies) IO.puts "Transfer A finished" end transfer2 = spawn fn -> IO.puts "Transfer B started" Accounts.transfer(this, accounts, :joe, :bob, 1, 100_000, inconsistencies) IO.puts "Transfer B finished" end IO.puts "Waiting for transfers to be done" receive do {:done, ^transfer1} -> nil end receive do {:done, ^transfer2} -> nil end send accounts, {:amounts, [:bob, :joe], self} receive do {:amounts, amounts} -> IO.puts "Bob has in account: #{amounts[:bob]}" IO.puts "Joe has in account: #{amounts[:joe]}" IO.puts "Inconsistencies while transfer: #{Agent.get(inconsistencies, fn x -> x end)}" end

Quando executo este script, recebo:

 Waiting for transfers to be done Transfer A started Transfer B started Transfer B finished Transfer A finished Bob has in account: 100000 Joe has in account: 400000 Inconsistencies while transfer: 0

Podemos ver que a transferência de dinheiro funciona sem inconsistências, pois escolhemos a transferência de mensagens para transferir dinheiro e valores de mensagens para obter o valor das contas o que nos dá um comportamento previsível do programa. Sempre que fazemos uma transferência de dinheiro, a quantia total de dinheiro a qualquer momento deve ser a mesma.

O modelo de ator pode causar bloqueio e, portanto, impasse, portanto, tenha cuidado ao projetar o programa. O script a seguir mostra como você pode simular o cenário de bloqueio e conflito.

 # # Deadlock.exs # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking B, A started" spawn fn -> Lock.locking(b_lock, a_lock, 1_000) IO.puts "Locking B, A finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking B, A started Waiting for locking to be done

From the output we can see that the processes that lock A and B are stuck. This happens because the first process waits for the second process to release B while second process waiting first process to release A. They are waiting for each other and are stuck forever. To avoid this locking, order should always be the same, or design a program so that it doesn't use lock (meaning that it doesn't wait for a specific message). The following listing always locks first A then B.

 # # Deadlock fixed # defmodule Lock do def loop(state) do receive do {:lock, sender} -> case state do [] -> send sender, :locked loop([sender]) _ -> loop(state ++ [sender]) end {:unlock, sender} -> case state do [] -> loop(state) [^sender | []] -> loop([]) [^sender | [next | tail]] -> send next, :locked loop([next | tail]) _ -> loop(state) end end end def lock(pid) do send pid, {:lock, self} receive do :locked -> nil # This will block until we receive message end end def unlock(pid) do send pid, {:unlock, self} end def locking(first, second, times) do if times > 0 do lock(first) lock(second) unlock(second) unlock(first) locking(first, second, times - 1) end end end a_lock = spawn fn -> Lock.loop([]) end b_lock = spawn fn -> Lock.loop([]) end this = self IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Locking A, B started" spawn fn -> Lock.locking(a_lock, b_lock, 1_000) IO.puts "Locking A, B finished" send this, :done end IO.puts "Waiting for locking to be done" receive do :done -> nil end receive do :done -> nil End

When I run this script on my laptop I get:

 Locking A, B started Locking A, B started Waiting for locking to be done Locking A, B finished Locking A, B finished

And now, there is no longer a deadlock.

Embrulhar

As an introduction to concurrent programming, we have covered a few concurrency models. We haven't covered all models, as this article would be too big. Just to name a few, channels and reactive streams are some of the other popularly used concurrency models. Channels and reactive streams have many similarities with the actor model. All of them transmit messages, but many threads can receive messages from one channel, and reactive streams transmit messages in one direction to form directed graph that receive messages from one end and send messages from the other end as a result of the processing.

Shared mutable state models can easily go wrong if we don't think ahead. It has problems of race condition and deadlock. If we have a choice between different concurrent programming models, it would be easier to implement and maintain but otherwise we have to be very careful what we do.

The functional way is a lot easier to reason about and implement. It cannot have deadlock. This model may have worse performance than shared mutable state model, but a program that works is always faster than one that does not work.

Actor model is a good choice for concurrent programming. Although there are problems of race condition and deadlock, they can happen less than in shared mutable state model since the only way for processes to communicate is via messages. With good message design between processes, that can be avoided. If a problem occurs it is then in the order or meaning of messages in communication between the processes and you know where to look.

I hope this article has given you some insight to what concurrent programming is and how it gives structure to the programs you write.

Related: Ruby Concurrency and Parallelism: A Practical Tutorial