Concurența și toleranța la erori simplificate: un tutorial Akka cu exemple

Publicat: 2022-03-11

Provocarea

Scrierea de programe concurente este dificilă. A avea de a face cu fire, blocări, condiții de cursă și așa mai departe este foarte predispus la erori și poate duce la un cod dificil de citit, testat și întreținut.

Mulți preferă, prin urmare, să evite cu totul multithreading. În schimb, folosesc procese cu un singur thread, bazându-se pe servicii externe (cum ar fi baze de date, cozi etc.) pentru a gestiona orice operațiuni concurente sau asincrone necesare. În timp ce această abordare este în unele cazuri o alternativă legitimă, există multe scenarii în care pur și simplu nu este o opțiune viabilă. Multe sisteme în timp real – cum ar fi aplicațiile de tranzacționare sau bancare, sau jocurile în timp real – nu au luxul de a aștepta finalizarea unui proces cu un singur thread (au nevoie de răspuns acum!). Alte sisteme sunt atât de intensive în calcul sau în resurse încât ar dura o cantitate excesivă de timp (ore sau chiar zile în unele cazuri) pentru a rula fără a introduce paralelizarea în codul lor.

O abordare destul de comună cu un singur thread (folosită pe scară largă în lumea Node.js, de exemplu) este utilizarea unei paradigme bazate pe evenimente, fără blocare. Deși acest lucru ajută la performanță prin evitarea comutărilor de context, a blocărilor și a blocării, tot nu abordează problemele de utilizare a mai multor procesoare concomitent (a face acest lucru ar necesita lansarea și coordonarea între mai multe procese independente).

Deci asta înseamnă că nu ai de ales decât să călătorești adânc în măruntaiele firelor, încuietorilor și condițiilor de cursă pentru a construi o aplicație concomitentă?

Datorită cadrului Akka, răspunsul este nu. Acest tutorial prezintă exemple Akka și explorează modalitățile în care facilitează și simplifică implementarea aplicațiilor concurente, distribuite.

Ce este Cadrul Akka?

Această postare prezintă Akka și explorează modalitățile prin care facilitează și simplifică implementarea aplicațiilor concurente, distribuite.

Akka este un set de instrumente și un timp de rulare pentru construirea de aplicații foarte concurente, distribuite și tolerante la erori pe JVM. Akka este scris în Scala, cu legături de limbă furnizate atât pentru Scala, cât și pentru Java.

Abordarea lui Akka de a gestiona concurența se bazează pe modelul actorului. Într-un sistem bazat pe actor, totul este un actor, în același mod în care totul este un obiect în designul orientat pe obiecte. O diferență cheie, totuși – deosebit de relevantă pentru discuția noastră – este că modelul actorului a fost special conceput și proiectat pentru a servi ca model concurent, în timp ce modelul orientat pe obiecte nu este. Mai precis, într-un sistem de actori Scala, actorii interacționează și împărtășesc informații, fără nicio presupoziție de secvențialitate. Mecanismul prin care actorii împărtășesc informații unii cu alții și sarcini unii altora este transmiterea de mesaje.

Toată complexitatea creării și programării firelor de execuție, a primirii și expedierii mesajelor și a gestionării condițiilor de cursă și a sincronizării, este retrogradată în cadrul cadru de gestionare transparent.

Akka creează un strat între actori și sistemul de bază, astfel încât actorii trebuie pur și simplu să proceseze mesajele. Toată complexitatea creării și programării firelor de execuție, a primirii și expedierii mesajelor și a gestionării condițiilor de cursă și a sincronizării, este retrogradată în cadrul cadru de gestionare transparent.

Akka aderă cu strictețe la Manifestul Reactiv. Aplicațiile reactive urmăresc înlocuirea aplicațiilor tradiționale multithreaded cu o arhitectură care satisface una sau mai multe dintre următoarele cerințe:

  • Condus de evenimente. Folosind Actors, se poate scrie cod care gestionează cererile în mod asincron și utilizează exclusiv operațiuni neblocante.
  • Scalabil. În Akka, adăugarea de noduri fără a fi nevoie să modificați codul este posibilă, datorită atât transmiterii mesajelor, cât și transparenței locației.
  • Rezistent. Orice aplicație va întâmpina erori și va eșua la un moment dat. Akka oferă strategii de „supraveghere” (toleranță la greșeală) pentru a facilita un sistem de auto-vindecare.
  • Receptiv. Multe dintre aplicațiile de înaltă performanță și răspuns rapid de astăzi trebuie să ofere feedback rapid utilizatorului și, prin urmare, trebuie să reacționeze la evenimente într-un mod extrem de oportun. Strategia de non-blocare a Akka, bazată pe mesaje, ajută la realizarea acestui lucru.

Ce este un actor în Akka?

Un actor nu este în esență altceva decât un obiect care primește mesaje și ia acțiuni pentru a le gestiona. Este decuplat de sursa mesajului și singura sa responsabilitate este să recunoască în mod corespunzător tipul de mesaj pe care l-a primit și să ia măsuri în consecință.

La primirea unui mesaj, un actor poate întreprinde una sau mai multe dintre următoarele acțiuni:

  • Executați unele operațiuni în sine (cum ar fi efectuarea de calcule, persistența datelor, apelarea unui serviciu web extern și așa mai departe)
  • Redirecționați mesajul sau un mesaj derivat către un alt actor
  • Instanciați un nou actor și transmiteți-i mesajul

Alternativ, actorul poate alege să ignore în întregime mesajul (adică, poate alege inacțiunea) dacă consideră că este adecvat să facă acest lucru.

Pentru a implementa un actor, este necesar să extindeți trăsătura akka.actor.Actor și să implementați metoda de primire. Metoda de primire a unui actor este invocată (de Akka) atunci când un mesaj este trimis acelui actor. Implementarea sa tipică constă în potrivirea modelului, așa cum se arată în următorul exemplu Akka, pentru a identifica tipul de mesaj și a reacționa în consecință:

 import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }

Potrivirea modelelor este o tehnică relativ elegantă pentru gestionarea mesajelor, care tinde să producă un cod mai „curat” și mai ușor de navigat decât o implementare comparabilă bazată pe apeluri inverse. Luați în considerare, de exemplu, o implementare simplă de solicitare/răspuns HTTP.

Mai întâi, să implementăm acest lucru folosind o paradigmă bazată pe callback în JavaScript:

 route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });

Acum să comparăm asta cu o implementare bazată pe potrivirea modelelor:

 msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }

Deși codul JavaScript bazat pe callback este, desigur, compact, cu siguranță este mai greu de citit și de navigat. În comparație, codul bazat pe potrivirea modelelor face să fie mai evident ce cazuri sunt luate în considerare și cum este tratat fiecare.

Sistemul actorilor

A lua o problemă complexă și a o împărți recursiv în sub-probleme mai mici este o tehnică solidă de rezolvare a problemelor în general. Această abordare poate fi deosebit de benefică în informatică (conformă cu Principiul responsabilității unice), deoarece tinde să producă cod curat, modular, cu redundanță mică sau deloc, care este relativ ușor de întreținut.

Într-un proiect bazat pe actori, utilizarea acestei tehnici facilitează organizarea logică a actorilor într-o structură ierarhică cunoscută sub numele de Sistem de actori. Sistemul actorilor oferă infrastructura prin care actorii interacționează între ei.

Un exemplu al modului în care sistemul actorilor funcționează în cadrul Akka.

În Akka, singura modalitate de a comunica cu un actor este printr-un ActorRef . Un ActorRef reprezintă o referință la un actor care împiedică alte obiecte să acceseze direct sau să manipuleze elementele interne și starea acelui actor. Mesajele pot fi trimise unui actor printr-un ActorRef folosind unul dintre următoarele protocoale de sintaxă:

  • ! („spune”) – trimite mesajul și revine imediat
  • ? („întreaba”) – trimite mesajul și returnează un viitor reprezentând un posibil răspuns

Fiecare actor are o cutie poștală în care sunt livrate mesajele primite. Există mai multe implementări de cutie poștală din care să alegeți, implementarea implicită fiind FIFO.

Un actor conține multe variabile de instanță pentru a menține starea în timp ce procesează mai multe mesaje. Akka se asigură că fiecare instanță a unui actor rulează în propriul thread ușor și că mesajele sunt procesate pe rând. În acest fel, starea fiecărui actor poate fi menținută în mod fiabil, fără ca dezvoltatorul să fie nevoit să-și facă griji în mod explicit cu privire la sincronizare sau la condițiile de cursă.

Fiecare actor primește următoarele informații utile pentru a-și îndeplini sarcinile prin intermediul API-ului Akka Actor:

  • sender : un ActorRef către expeditorul mesajului în curs de procesare
  • context : informații și metode referitoare la contextul în care rulează actorul (include, de exemplu, o metodă actorOf pentru instanțierea unui nou actor)
  • supervisionStrategy : definește strategia care trebuie utilizată pentru recuperarea din erori
  • self : ActorRef pentru actorul însuși
Akka se asigură că fiecare instanță a unui actor rulează în propriul thread ușor și că mesajele sunt procesate pe rând. În acest fel, starea fiecărui actor poate fi menținută în mod fiabil, fără ca dezvoltatorul să fie nevoit să-și facă griji în mod explicit cu privire la sincronizare sau la condițiile de cursă.

Pentru a lega aceste tutoriale, să luăm în considerare un exemplu simplu de numărare a numărului de cuvinte dintr-un fișier text.

În scopul exemplului nostru Akka, vom descompune problema în două subsarcini; și anume, (1) o sarcină „copil” de numărare a numărului de cuvinte pe o singură linie și (2) o sarcină „părinte” de însumare a numărului de cuvinte pe linie pentru a obține numărul total de cuvinte din fișier.

Actorul părinte va încărca fiecare linie din fișier și apoi va delega unui actor copil sarcina de a număra cuvintele din acea linie. Când copilul termină, acesta va trimite un mesaj înapoi părintelui cu rezultatul. Părintele va primi mesajele cu numărul de cuvinte (pentru fiecare linie) și va păstra un contor pentru numărul total de cuvinte din întregul fișier, pe care apoi îl va returna invocatorului său la finalizare.

(Rețineți că exemplele de cod tutorial Akka furnizate mai jos sunt destinate doar didactice și, prin urmare, nu se preocupă neapărat de toate condițiile de margine, optimizările performanței și așa mai departe. De asemenea, o versiune completă compilabilă a exemplelor de cod prezentate mai jos este disponibilă în esența asta.)

Să ne uităm mai întâi la un exemplu de implementare a clasei StringCounterActor copil:

 case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }

Acest actor are o sarcină foarte simplă: consumă mesajele ProcessStringMsg (conținând o linie de text), numără numărul de cuvinte de pe linia specificată și returnează rezultatul expeditorului printr-un mesaj StringProcessedMsg . Rețineți că am implementat clasa noastră pentru a folosi ! („tell”) pentru a trimite mesajul StringProcessedMsg (adică pentru a trimite mesajul și a reveni imediat).

OK, acum să ne îndreptăm atenția către clasa părinte WordCounterActor :

 1. case class StartProcessFileMsg() 2. 3. class WordCounterActor(filename: String) extends Actor { 4. 5. private var running = false 6. private var totalLines = 0 7. private var linesProcessed = 0 8. private var result = 0 9. private var fileSender: Option[ActorRef] = None 10. 11. def receive = { 12. case StartProcessFileMsg() => { 13. if (running) { 14. // println just used for example purposes; 15. // Akka logger should be used instead 16. println("Warning: duplicate start message received") 17. } else { 18. running = true 19. fileSender = Some(sender) // save reference to process invoker 20. import scala.io.Source._ 21. fromFile(filename).getLines.foreach { line => 22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) 23. totalLines += 1 24. } 25. } 26. } 27. case StringProcessedMsg(words) => { 28. result += words 29. linesProcessed += 1 30. if (linesProcessed == totalLines) { 31. fileSender.map(_ ! result) // provide result to process invoker 32. } 33. } 34. case _ => println("message not recognized!") 35. } 36. }

Multe lucruri se întâmplă aici, așa că haideți să le examinăm pe fiecare dintre ele mai detaliat (rețineți că numerele de linie la care se face referire în discuția care urmează se bazează pe exemplul de cod de mai sus) ...

Mai întâi, observați că numele fișierului de procesat este transmis constructorului WordCounterActor (linia 3). Acest lucru indică faptul că actorul trebuie folosit doar pentru a procesa un singur fișier. Acest lucru simplifică, de asemenea, munca de codare pentru dezvoltator, evitând necesitatea de a reseta variabilele de stare ( running , totalLines , linesProcessed și result ) atunci când lucrarea este terminată, deoarece instanța este folosită o singură dată (adică, pentru a procesa un singur fișier) și apoi aruncate.

Apoi, observați că WordCounterActor gestionează două tipuri de mesaje:

  • StartProcessFileMsg (linia 12)
    • Primit de la actorul extern care inițial inițiază WordCounterActor .
    • Când este primit, WordCounterActor verifică mai întâi dacă nu primește o solicitare redundantă.
    • Dacă cererea este redundantă, WordCounterActor generează un avertisment și nu se mai face nimic (linia 16).
    • Dacă cererea nu este redundantă:
      • WordCounterActor stochează o referință la expeditor în variabila de instanță fileSender (rețineți că aceasta este o Option[ActorRef] mai degrabă decât o Option[Actor] - vezi linia 9). Acest ActorRef este necesar pentru a-l accesa ulterior și a răspunde la el atunci când se procesează StringProcessedMsg final (care este primit de la un copil StringCounterActor , așa cum este descris mai jos).
      • WordCounterActor citește apoi fișierul și, pe măsură ce fiecare linie din fișier este încărcată, este creat un copil StringCounterActor și îi este transmis un mesaj care conține linia de procesat (liniile 21-24).
  • StringProcessedMsg (linia 27)
    • Primit de la un StringCounterActor copil când termină procesarea liniei care i-a fost atribuită.
    • Când este primit, WordCounterActor incrementează contorul de linii pentru fișier și, dacă toate liniile din fișier au fost procesate (adică, când totalLines și linesProcessed sunt egale), trimite rezultatul final către fileSender original (liniile 28-31).

Încă o dată, observați că în Akka, singurul mecanism de comunicare între actori este transmiterea mesajelor. Mesajele sunt singurul lucru pe care actorii îl împărtășesc și, din moment ce actorii pot accesa aceleași mesaje concomitent, este important ca aceștia să fie imutabili, pentru a evita condițiile de rasă și comportamentele neașteptate.

Clasele de caz din Scala sunt clase obișnuite care oferă un mecanism recursiv de descompunere prin potrivirea modelelor.

Prin urmare, este obișnuit să se transmită mesaje sub formă de clase de caz, deoarece acestea sunt imuabile în mod implicit și din cauza cât de perfect se integrează cu potrivirea modelelor.

Să încheiem exemplul cu exemplul de cod pentru a rula întreaga aplicație.

 object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
În programarea concomitentă, un „viitor” este în esență un obiect substituent pentru un rezultat care nu este încă cunoscut.

Observați cum de data aceasta ? metoda este folosită pentru a trimite un mesaj. În acest fel, apelantul poate folosi Viitorul returnat pentru a imprima rezultatul final atunci când acesta este disponibil și pentru a părăsi programul prin închiderea ActorSystem.

Akka toleranță la greșeală și strategii de supraveghere

Într-un sistem de actori, fiecare actor este supraveghetorul copiilor săi. Dacă un actor nu reușește să gestioneze un mesaj, se suspendă pe sine și pe toți copiii săi și trimite un mesaj, de obicei sub forma unei excepții, supraveghetorului său.

În Akka, strategiile de supraveghere sunt mecanismul principal și simplu pentru definirea comportamentului tolerant la erori al sistemului dumneavoastră.

În Akka, modul în care un supervizor reacționează și gestionează excepțiile care i se infiltrează de la copiii săi este denumit strategie de supervizor. Strategiile de supraveghere sunt mecanismul principal și simplu prin care definiți comportamentul tolerant la erori al sistemului dumneavoastră.

Când un mesaj care indică o eroare ajunge la un supervizor, acesta poate lua una dintre următoarele acțiuni:

  • Reluați copilul (și copiii lui), păstrându-i starea internă. Această strategie poate fi aplicată atunci când starea copil nu a fost coruptă de eroare și poate continua să funcționeze corect.
  • Reporniți copilul (și copiii săi), ștergându-i starea internă. Această strategie poate fi utilizată în scenariul opus celui descris. Dacă starea copil a fost coruptă de eroare, este necesară resetarea stării sale înainte de a putea fi utilizată în viitor.
  • Opriți copilul (și copiii săi) definitiv. Această strategie poate fi folosită în cazurile în care condiția de eroare nu se crede a fi remediabilă, dar nu pune în pericol restul operațiunii care se execută, care poate fi finalizată în absența copilului eșuat.
  • Oprește-te și escaladează eroarea. Angajat atunci când supervizorul nu știe cum să facă față eșecului și astfel îl transferă la propriul supervizor.

Mai mult, un actor poate decide să aplice acțiunea doar copiilor eșuați sau și fraților săi. Există două strategii predefinite pentru aceasta:

  • OneForOneStrategy : aplică acțiunea specificată numai copilului eșuat
  • AllForOneStrategy : aplică acțiunea specificată tuturor copiilor săi

Iată un exemplu simplu, folosind OneForOneStrategy :

 import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }

Dacă nu este specificată nicio strategie, se utilizează următoarea strategie implicită:

  • Dacă a apărut o eroare la inițializarea actorului sau dacă actorul a fost ucis, actorul este oprit.
  • Dacă a existat orice alt fel de excepție, actorul este pur și simplu repornit.

Implementarea furnizată de Akka a acestei strategii implicite este următoarea:

 final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }

Akka permite implementarea strategiilor personalizate de supervizor, dar după cum avertizează documentația Akka, faceți acest lucru cu precauție, deoarece implementările incorecte pot duce la probleme precum sistemele de actori blocate (adică actori suspendați definitiv).

Transparența locației

Arhitectura Akka acceptă transparența locației, permițând actorilor să fie complet agnostici de unde provin mesajele pe care le primesc. Expeditorul mesajului poate locui în același JVM ca și actorul sau într-un JVM separat (fie care rulează pe același nod, fie pe un alt nod). Akka permite ca fiecare dintre aceste cazuri să fie tratat într-un mod care este complet transparent pentru actor (și, prin urmare, pentru dezvoltator). Singura avertizare este că mesajele trimise pe mai multe noduri trebuie să fie serializabile.

Arhitectura Akka acceptă transparența locației, permițând actorilor să fie complet agnostici de unde provin mesajele pe care le primesc.

Sistemele Actor sunt proiectate să ruleze într-un mediu distribuit fără a necesita niciun cod specializat. Akka necesită doar prezența unui fișier de configurare ( application.conf ) care specifică nodurile către care să trimită mesajele. Iată un exemplu simplu de fișier de configurare:

 akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }

Câteva sfaturi de despărțire...

Am văzut cum cadrul Akka ajută la obținerea concurenței și a performanței ridicate. Cu toate acestea, așa cum a subliniat acest tutorial, există câteva puncte de care trebuie să țineți cont atunci când proiectați și implementați sistemul dvs. pentru a exploata la maximum puterea lui Akka:

  • În cea mai mare măsură posibilă, fiecărui actor ar trebui să i se atribuie cea mai mică sarcină posibilă (după cum s-a discutat anterior, urmând principiul responsabilității unice)
  • Actorii ar trebui să gestioneze evenimentele (adică să proceseze mesajele) în mod asincron și nu ar trebui să blocheze, altfel se vor produce schimbări de context care pot afecta negativ performanța. Mai exact, cel mai bine este să efectuați operațiuni de blocare (IO, etc.) într-un Viitor pentru a nu bloca actorul; adică:

     case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
  • Asigurați-vă că toate mesajele dvs. sunt imuabile, deoarece actorii care le transmit unul altuia vor rula concomitent în propriile lor fire. Este posibil ca mesajele modificabile să aibă ca rezultat un comportament neașteptat.
  • Deoarece mesajele trimise între noduri trebuie să fie serializabile, este important să rețineți că, cu cât mesajele sunt mai mari, cu atât va dura mai mult pentru a le serializa, trimite și deserializa, ceea ce poate avea un impact negativ asupra performanței.

Concluzie

Akka, scris în Scala, simplifică și facilitează dezvoltarea de aplicații foarte concurente, distribuite și tolerante la erori, ascunzând o mare parte din complexitate de la dezvoltator. A face dreptate deplină lui Akka ar necesita mult mai mult decât acest tutorial unic, dar sperăm că această introducere și exemplele sale au fost suficient de captivante pentru a vă face să doriți să citiți mai multe.

Amazon, VMWare și CSC sunt doar câteva exemple de companii lider care folosesc activ Akka. Vizitați site-ul web oficial Akka pentru a afla mai multe și pentru a explora dacă Akka ar putea fi răspunsul potrivit și pentru proiectul dvs.