Concorrenza e tolleranza ai guasti semplificate: un tutorial di Akka con esempi
Pubblicato: 2022-03-11La sfida
Scrivere programmi simultanei è difficile. Avere a che fare con thread, lock, race condition e così via è altamente soggetto a errori e può portare a codice difficile da leggere, testare e mantenere.
Molti quindi preferiscono evitare del tutto il multithreading. Al contrario, utilizzano esclusivamente processi a thread singolo, basandosi su servizi esterni (come database, code, ecc.) per gestire eventuali operazioni simultanee o asincrone necessarie. Sebbene questo approccio sia in alcuni casi un'alternativa legittima, ci sono molti scenari in cui semplicemente non è un'opzione praticabile. Molti sistemi in tempo reale, come applicazioni di trading o bancarie o giochi in tempo reale, non hanno il lusso di attendere il completamento di un processo a thread singolo (hanno bisogno della risposta ora!). Altri sistemi sono così dispendiosi in termini di calcolo o risorse da richiedere una quantità eccessiva di tempo (ore o addirittura giorni in alcuni casi) per essere eseguiti senza introdurre la parallelizzazione nel codice.
Un approccio a thread singolo abbastanza comune (ampiamente utilizzato nel mondo Node.js, ad esempio) consiste nell'utilizzare un paradigma basato su eventi e non bloccante. Sebbene ciò aiuti le prestazioni evitando cambi di contesto, blocchi e blocchi, non risolve comunque i problemi dell'utilizzo di più processori contemporaneamente (farlo richiederebbe l'avvio e il coordinamento tra più processi indipendenti).
Quindi questo significa che non hai altra scelta che viaggiare in profondità nelle viscere di thread, blocchi e condizioni di gara per creare un'applicazione simultanea?
Grazie al framework Akka, la risposta è no. Questo tutorial introduce esempi di Akka ed esplora i modi in cui facilita e semplifica l'implementazione di applicazioni distribuite simultanee.
Cos'è l'Akka Framework?
Akka è un toolkit e runtime per la creazione di applicazioni altamente simultanee, distribuite e tolleranti ai guasti sulla JVM. Akka è scritto in Scala, con collegamenti linguistici forniti sia per Scala che per Java.
L'approccio di Akka alla gestione della concorrenza si basa sull'Actor Model. In un sistema basato sugli attori, tutto è un attore, più o meno allo stesso modo in cui tutto è un oggetto nel design orientato agli oggetti. Una differenza fondamentale, tuttavia, particolarmente rilevante per la nostra discussione, è che l'Actor Model è stato specificamente progettato e progettato per fungere da modello simultaneo mentre il modello orientato agli oggetti non lo è. In particolare, in un sistema attore scala, gli attori interagiscono e condividono informazioni, senza alcun presupposto di sequenzialità. Il meccanismo attraverso il quale gli attori condividono le informazioni tra loro e si incaricano l'un l'altro è il passaggio di messaggi.
Akka crea uno strato tra gli attori e il sistema sottostante in modo tale che gli attori debbano semplicemente elaborare i messaggi. Tutta la complessità della creazione e della pianificazione dei thread, della ricezione e dell'invio di messaggi e della gestione delle condizioni di gara e della sincronizzazione è relegata al framework per una gestione trasparente.
Akka aderisce rigorosamente al The Reactive Manifesto. Le applicazioni reattive mirano a sostituire le tradizionali applicazioni multithread con un'architettura che soddisfi uno o più dei seguenti requisiti:
- Evento guidato. Usando gli attori, è possibile scrivere codice che gestisce le richieste in modo asincrono e utilizza esclusivamente operazioni non bloccanti.
- scalabile. In Akka è possibile aggiungere nodi senza dover modificare il codice, grazie sia al passaggio dei messaggi che alla trasparenza della posizione.
- Resiliente. Qualsiasi applicazione incontrerà errori e non riuscirà a un certo punto. Akka fornisce strategie di "supervisione" (tolleranza ai guasti) per facilitare un sistema di autoguarigione.
- reattivo. Molte delle odierne applicazioni ad alte prestazioni e risposta rapida devono fornire un feedback rapido all'utente e quindi devono reagire agli eventi in modo estremamente tempestivo. La strategia non bloccante e basata sui messaggi di Akka aiuta a raggiungere questo obiettivo.
Cos'è un attore in Akka?
Un attore non è essenzialmente altro che un oggetto che riceve messaggi e intraprende azioni per gestirli. È disaccoppiato dalla fonte del messaggio e la sua unica responsabilità è riconoscere correttamente il tipo di messaggio che ha ricevuto e agire di conseguenza.
Dopo aver ricevuto un messaggio, un attore può intraprendere una o più delle seguenti azioni:
- Eseguire alcune operazioni (come l'esecuzione di calcoli, la persistenza dei dati, la chiamata a un servizio Web esterno e così via)
- Inoltra il messaggio, o un messaggio derivato, a un altro attore
- Istanziare un nuovo attore e inoltrargli il messaggio
In alternativa, l'attore può scegliere di ignorare completamente il messaggio (ad esempio, può scegliere di non agire) se lo ritiene opportuno.
Per implementare un attore, è necessario estendere il tratto akka.actor.Actor e implementare il metodo di ricezione. Il metodo di ricezione di un attore viene invocato (da Akka) quando un messaggio viene inviato a quell'attore. La sua implementazione tipica consiste nel pattern matching, come mostrato nel seguente esempio Akka, per identificare il tipo di messaggio e reagire di conseguenza:
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
Il pattern matching è una tecnica relativamente elegante per la gestione dei messaggi, che tende a produrre codice più "pulito" e più facile da navigare rispetto a un'implementazione comparabile basata sui callback. Si consideri, ad esempio, un'implementazione di richiesta/risposta HTTP semplicistica.
Innanzitutto, implementiamo questo utilizzando un paradigma basato su callback in JavaScript:
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
Ora confrontiamo questo con un'implementazione basata sul pattern-matching:
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
Sebbene il codice JavaScript basato su callback sia certamente compatto, è sicuramente più difficile da leggere e navigare. In confronto, il codice basato sul pattern-matching rende più immediatamente evidente quali casi vengono presi in considerazione e come ciascuno viene gestito.
Il sistema dell'attore
Prendere un problema complesso e dividerlo ricorsivamente in sottoproblemi più piccoli è una buona tecnica di risoluzione dei problemi in generale. Questo approccio può essere particolarmente vantaggioso in informatica (coerente con il principio di responsabilità unica), poiché tende a produrre codice pulito, modulare, con poca o nessuna ridondanza, che è relativamente facile da mantenere.
In una progettazione basata sugli attori, l'uso di questa tecnica facilita l'organizzazione logica degli attori in una struttura gerarchica nota come sistema attore. Il sistema attore fornisce l'infrastruttura attraverso la quale gli attori interagiscono tra loro.
In Akka, l'unico modo per comunicare con un attore è attraverso un ActorRef
. Un ActorRef
rappresenta un riferimento a un attore che impedisce ad altri oggetti di accedere o manipolare direttamente gli interni e lo stato di quell'attore. I messaggi possono essere inviati a un attore tramite un ActorRef
utilizzando uno dei seguenti protocolli di sintassi:
-
!
(“tell”) – invia il messaggio e ritorna immediatamente -
?
(“ask”) – invia il messaggio e restituisce un Future che rappresenta una possibile risposta
Ogni attore ha una casella di posta in cui vengono consegnati i messaggi in arrivo. Sono disponibili più implementazioni di cassette postali tra cui scegliere, con l'implementazione predefinita FIFO.
Un attore contiene molte variabili di istanza per mantenere lo stato durante l'elaborazione di più messaggi. Akka garantisce che ogni istanza di un attore venga eseguita nel proprio thread leggero e che i messaggi vengano elaborati uno alla volta. In questo modo, lo stato di ogni attore può essere mantenuto in modo affidabile senza che lo sviluppatore debba preoccuparsi esplicitamente della sincronizzazione o delle condizioni di gara.
A ciascun attore vengono fornite le seguenti informazioni utili per svolgere i propri compiti tramite l'API di Akka Actor:
-
sender
: unActorRef
al mittente del messaggio attualmente in elaborazione -
context
: informazioni e metodi relativi al contesto all'interno del quale è in esecuzione l'attore (include, ad esempio, un metodoactorOf
per creare un'istanza di un nuovo attore) -
supervisionStrategy
: definisce la strategia da utilizzare per il recupero dagli errori -
self
: l'ActorRef
per l'attore stesso
Per aiutare a collegare insieme questi tutorial, consideriamo un semplice esempio di conteggio del numero di parole in un file di testo.
Ai fini del nostro esempio di Akka, scomporremo il problema in due sottoattività; vale a dire, (1) un'attività "figlio" di contare il numero di parole su una singola riga e (2) un'attività "genitore" di sommare quei conteggi di parole per riga per ottenere il numero totale di parole nel file.
L'attore genitore caricherà ogni riga dal file e quindi delegherà a un attore figlio il compito di contare le parole in quella riga. Quando il bambino ha finito, invierà un messaggio al genitore con il risultato. Il genitore riceverà i messaggi con il conteggio delle parole (per ogni riga) e manterrà un contatore per il numero totale di parole nell'intero file, che poi restituirà al suo invocatore al termine.
(Si noti che gli esempi di codice del tutorial di Akka forniti di seguito sono da intendersi esclusivamente didattici e quindi non riguardano necessariamente tutte le condizioni marginali, le ottimizzazioni delle prestazioni e così via. Inoltre, è disponibile una versione compilabile completa degli esempi di codice mostrati di seguito in questo succo.)
Diamo prima un'occhiata a un'implementazione di esempio della classe figlio StringCounterActor
:
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
Questo attore ha un compito molto semplice: consumare messaggi ProcessStringMsg
(contenenti una riga di testo), contare il numero di parole sulla riga specificata e restituire il risultato al mittente tramite un messaggio StringProcessedMsg
. Nota che abbiamo implementato la nostra classe per usare !
("tell") per inviare il messaggio StringProcessedMsg
(ovvero, per inviare il messaggio e restituire immediatamente).

OK, ora rivolgiamo la nostra attenzione alla classe padre WordCounterActor
:
1. case class StartProcessFileMsg() 2. 3. class WordCounterActor(filename: String) extends Actor { 4. 5. private var running = false 6. private var totalLines = 0 7. private var linesProcessed = 0 8. private var result = 0 9. private var fileSender: Option[ActorRef] = None 10. 11. def receive = { 12. case StartProcessFileMsg() => { 13. if (running) { 14. // println just used for example purposes; 15. // Akka logger should be used instead 16. println("Warning: duplicate start message received") 17. } else { 18. running = true 19. fileSender = Some(sender) // save reference to process invoker 20. import scala.io.Source._ 21. fromFile(filename).getLines.foreach { line => 22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) 23. totalLines += 1 24. } 25. } 26. } 27. case StringProcessedMsg(words) => { 28. result += words 29. linesProcessed += 1 30. if (linesProcessed == totalLines) { 31. fileSender.map(_ ! result) // provide result to process invoker 32. } 33. } 34. case _ => println("message not recognized!") 35. } 36. }
Molte cose stanno succedendo qui, quindi esaminiamo ciascuna di esse in modo più dettagliato (notare che i numeri di riga a cui si fa riferimento nella discussione che segue si basano sull'esempio di codice sopra) ...
Innanzitutto, si noti che il nome del file da elaborare viene passato al costruttore di WordCounterActor
(riga 3). Ciò indica che l'attore deve essere utilizzato solo per elaborare un singolo file. Ciò semplifica anche il lavoro di codifica per lo sviluppatore, evitando la necessità di reimpostare le variabili di stato ( running
, totalLines
, linesProcessed
e result
) al termine del lavoro, poiché l'istanza viene utilizzata solo una volta (ad esempio, per elaborare un singolo file) e poi scartato.
Quindi, osserva che WordCounterActor
gestisce due tipi di messaggi:
-
StartProcessFileMsg
(riga 12)- Ricevuto dall'attore esterno che inizialmente avvia
WordCounterActor
. - Una volta ricevuto,
WordCounterActor
verifica innanzitutto di non ricevere una richiesta ridondante. - Se la richiesta è ridondante,
WordCounterActor
genera un avviso e non viene fatto altro (riga 16). - Se la richiesta non è ridondante:
-
WordCounterActor
memorizza un riferimento al mittente nella variabile di istanzafileSender
(notare che questo è unOption[ActorRef]
anziché unOption[Actor]
- vedere la riga 9). QuestoActorRef
è necessario per accedere in seguito e rispondere ad esso durante l'elaborazione delStringProcessedMsg
finale (che viene ricevuto da un figlioStringCounterActor
, come descritto di seguito). -
WordCounterActor
quindi legge il file e, quando ogni riga del file viene caricata, viene creato un figlioStringCounterActor
e gli viene passato un messaggio contenente la riga da elaborare (righe 21-24).
-
- Ricevuto dall'attore esterno che inizialmente avvia
-
StringProcessedMsg
(riga 27)- Ricevuto da un figlio
StringCounterActor
quando completa l'elaborazione della riga assegnata. - Una volta ricevuto, il
WordCounterActor
incrementa il contatore di righe del file e, se tutte le righe del file sono state elaborate (cioè, quandototalLines
elinesProcessed
sono uguali), invia il risultato finale alfileSender
originale (righe 28-31).
- Ricevuto da un figlio
Ancora una volta, nota che in Akka, l'unico meccanismo per la comunicazione tra attori è il passaggio di messaggi. I messaggi sono l'unica cosa che gli attori condividono e, poiché gli attori possono potenzialmente accedere agli stessi messaggi contemporaneamente, è importante che siano immutabili, al fine di evitare condizioni di gara e comportamenti imprevisti.
È quindi comune passare messaggi sotto forma di classi case poiché sono immutabili per impostazione predefinita e per la perfetta integrazione con il pattern matching.
Concludiamo l'esempio con l'esempio di codice per eseguire l'intera app.
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
Nota come questa volta il ?
il metodo viene utilizzato per inviare un messaggio. In questo modo, il chiamante può utilizzare il Future restituito per stampare il risultato finale quando questo è disponibile e per uscire dal programma chiudendo l'ActorSystem.
Tolleranza agli errori di Akka e strategie di supervisione
In un sistema attore, ogni attore è il supervisore dei suoi figli. Se un attore non riesce a gestire un messaggio, sospende se stesso e tutti i suoi figli e invia un messaggio, di solito sotto forma di eccezione, al suo supervisore.
In Akka, il modo in cui un supervisore reagisce e gestisce le eccezioni che gli arrivano dai suoi figli viene definito strategia del supervisore. Le strategie del supervisore sono il meccanismo principale e diretto mediante il quale si definisce il comportamento tollerante ai guasti del proprio sistema.
Quando un messaggio che indica un errore raggiunge un supervisore, può intraprendere una delle seguenti azioni:
- Riprendere il bambino (ei suoi figli), mantenendo il suo stato interno. Questa strategia può essere applicata quando lo stato figlio non è stato danneggiato dall'errore e può continuare a funzionare correttamente.
- Riavvia il bambino (ei suoi figli), ripulendo il suo stato interno. Questa strategia può essere utilizzata nello scenario opposto a quello appena descritto. Se lo stato figlio è stato danneggiato dall'errore, è necessario ripristinarne lo stato prima che possa essere utilizzato in futuro.
- Ferma il bambino (ei suoi figli) in modo permanente. Questa strategia può essere utilizzata nei casi in cui si ritiene che la condizione di errore non sia rettificabile, ma non comprometta il resto dell'operazione in corso, che può essere completata in assenza del figlio fallito.
- Arrestarsi e intensificare l'errore. Impiegato quando il supervisore non sa come gestire l'errore e quindi lo inoltra al proprio supervisore.
Inoltre, un attore può decidere di applicare l'azione solo ai figli falliti o anche ai suoi fratelli. Ci sono due strategie predefinite per questo:
-
OneForOneStrategy
: applica l'azione specificata solo al figlio non riuscito -
AllForOneStrategy
: applica l'azione specificata a tutti i suoi figli
Ecco un semplice esempio, utilizzando OneForOneStrategy
:
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
Se non viene specificata alcuna strategia, viene utilizzata la seguente strategia predefinita:
- Se si è verificato un errore durante l'inizializzazione dell'attore o se l'attore è stato ucciso, l'attore viene interrotto.
- Se c'è stato un altro tipo di eccezione, l'attore viene semplicemente riavviato.
L'implementazione fornita da Akka di questa strategia predefinita è la seguente:
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akka consente l'implementazione di strategie di supervisione personalizzate, ma come avverte la documentazione di Akka, farlo con cautela poiché implementazioni errate possono portare a problemi come sistemi di attori bloccati (cioè attori permanentemente sospesi).
Trasparenza della posizione
L'architettura Akka supporta la trasparenza della posizione, consentendo agli attori di essere completamente indipendenti da dove hanno avuto origine i messaggi che ricevono. Il mittente del messaggio può risiedere nella stessa JVM dell'attore o in una JVM separata (in esecuzione sullo stesso nodo o su un nodo diverso). Akka consente di gestire ciascuno di questi casi in modo completamente trasparente per l'attore (e quindi per lo sviluppatore). L'unico avvertimento è che i messaggi inviati su più nodi devono essere serializzabili.
I sistemi attore sono progettati per essere eseguiti in un ambiente distribuito senza richiedere alcun codice specializzato. Akka richiede solo la presenza di un file di configurazione ( application.conf
) che specifica i nodi a cui inviare i messaggi. Ecco un semplice esempio di file di configurazione:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
Alcuni consigli per l'addio...
Abbiamo visto come il framework Akka aiuta a ottenere concorrenza e prestazioni elevate. Tuttavia, come sottolineato in questo tutorial, ci sono alcuni punti da tenere a mente quando si progetta e si implementa il proprio sistema per sfruttare al massimo la potenza di Akka:
- Nella misura più ampia possibile, a ciascun attore dovrebbe essere assegnato il compito più piccolo possibile (come discusso in precedenza, seguendo il principio della responsabilità unica)
Gli attori dovrebbero gestire gli eventi (cioè elaborare i messaggi) in modo asincrono e non dovrebbero bloccarsi, altrimenti si verificheranno cambi di contesto che possono influire negativamente sulle prestazioni. Nello specifico, è meglio eseguire operazioni di blocco (IO, ecc.) in un Futuro per non bloccare l'attore; cioè:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- Assicurati che i tuoi messaggi siano tutti immutabili, poiché gli attori che li passano l'un l'altro funzioneranno tutti contemporaneamente nei propri thread. È probabile che i messaggi mutabili determinino un comportamento imprevisto.
- Poiché i messaggi inviati tra nodi devono essere serializzabili, è importante tenere presente che più grandi sono i messaggi, più tempo sarà necessario per serializzarli, inviarli e deserializzarli, il che può influire negativamente sulle prestazioni.
Conclusione
Akka, scritto in Scala, semplifica e facilita lo sviluppo di applicazioni altamente simultanee, distribuite e tolleranti ai guasti, nascondendo gran parte della complessità allo sviluppatore. Rendere giustizia ad Akka richiederebbe molto di più di questo singolo tutorial, ma si spera che questa introduzione e i suoi esempi siano stati sufficientemente accattivanti da farti desiderare di saperne di più.
Amazon, VMWare e CSC sono solo alcuni esempi di aziende leader che utilizzano attivamente Akka. Visita il sito web ufficiale di Akka per saperne di più ed esplorare se Akka potrebbe essere la risposta giusta anche per il tuo progetto.