Łatwa tolerancja współbieżności i błędów: samouczek Akka z przykładami
Opublikowany: 2022-03-11Wyzwanie
Pisanie programów współbieżnych jest trudne. Konieczność radzenia sobie z wątkami, blokadami, wyścigami itp. jest wysoce podatna na błędy i może prowadzić do kodu trudnego do odczytania, przetestowania i utrzymania.
Dlatego wielu woli całkowicie unikać wielowątkowości. Zamiast tego wykorzystują wyłącznie procesy jednowątkowe, polegając na usługach zewnętrznych (takich jak bazy danych, kolejki itp.) do obsługi wszelkich niezbędnych operacji współbieżnych lub asynchronicznych. Chociaż takie podejście jest w niektórych przypadkach uzasadnioną alternatywą, istnieje wiele scenariuszy, w których po prostu nie jest to opłacalna opcja. Wiele systemów czasu rzeczywistego – takich jak aplikacje handlowe, bankowe lub gry czasu rzeczywistego – nie ma luksusu czekania na zakończenie procesu jednowątkowego (potrzebują odpowiedzi już teraz!). Inne systemy wymagają tak dużej mocy obliczeniowej lub zasobów, że ich uruchomienie zajęłoby nadmierną ilość czasu (w niektórych przypadkach godziny lub nawet dni) bez wprowadzania zrównoleglania do ich kodu.
Jednym dość powszechnym podejściem jednowątkowym (szeroko stosowanym na przykład w świecie Node.js) jest użycie opartego na zdarzeniach, nieblokującego paradygmatu. Chociaż poprawia to wydajność, unikając przełączania kontekstu, blokowania i blokowania, nadal nie rozwiązuje problemów związanych z równoczesnym używaniem wielu procesorów (wymagałoby to uruchamiania i koordynowania wielu niezależnych procesów).
Czy to oznacza, że nie masz innego wyboru, jak tylko zagłębić się w wątki, blokady i warunki wyścigu, aby zbudować jednoczesną aplikację?
Dzięki frameworkowi Akka odpowiedź brzmi nie. Ten samouczek przedstawia przykłady Akka i odkrywa, w jaki sposób ułatwia i upraszcza implementację współbieżnych, rozproszonych aplikacji.
Czym jest struktura Akka?
Akka to zestaw narzędzi i środowisko wykonawcze do tworzenia wysoce współbieżnych, rozproszonych i odpornych na błędy aplikacji na JVM. Akka jest napisana w Scali, a powiązania językowe dostępne są zarówno dla Scali, jak i dla Javy.
Podejście Akki do obsługi współbieżności opiera się na Modelu Aktora. W systemie opartym na aktorze wszystko jest aktorem, podobnie jak wszystko jest obiektem w projektowaniu zorientowanym obiektowo. Jednak kluczowa różnica – szczególnie istotna dla naszej dyskusji – polega na tym, że model aktora został specjalnie zaprojektowany i zaprojektowany, aby służyć jako model współbieżny, podczas gdy model obiektowy nie jest. Dokładniej, w systemie aktorów Scala aktorzy wchodzą w interakcję i dzielą się informacjami, bez żadnych założeń dotyczących sekwencyjności. Mechanizmem, dzięki któremu aktorzy dzielą się informacjami i zadają sobie nawzajem zadania, jest przekazywanie wiadomości.
Akka tworzy warstwę między aktorami a podstawowym systemem, tak że aktorzy muszą po prostu przetwarzać wiadomości. Cała złożoność tworzenia i planowania wątków, odbierania i wysyłania wiadomości oraz obsługi warunków wyścigu i synchronizacji jest sprowadzona do struktury, aby obsługiwać ją w sposób przejrzysty.
Akka ściśle przestrzega Manifestu Reaktywnego. Aplikacje reaktywne mają na celu zastąpienie tradycyjnych aplikacji wielowątkowych architekturą, która spełnia co najmniej jedno z następujących wymagań:
- Oparte na wydarzeniach. Za pomocą aktorów można napisać kod, który obsługuje żądania asynchronicznie i wykorzystuje wyłącznie operacje nieblokujące.
- Skalowalny. W Akka dodawanie węzłów bez konieczności modyfikacji kodu jest możliwe, zarówno dzięki przekazywaniu wiadomości, jak i przejrzystości lokalizacji.
- Odporny. Każda aplikacja napotka błędy i w pewnym momencie zakończy się niepowodzeniem. Akka zapewnia strategie „nadzoru” (tolerancji błędów), aby ułatwić system samoleczenia.
- Czuły. Wiele współczesnych aplikacji o wysokiej wydajności i szybkim reagowaniu wymaga szybkiego przekazywania użytkownikowi informacji zwrotnych, a zatem musi reagować na zdarzenia w niezwykle szybkim czasie. Nieblokująca, oparta na wiadomościach strategia Akka pomaga to osiągnąć.
Kim jest aktor w Akce?
Aktor to w zasadzie nic innego jak obiekt, który odbiera wiadomości i podejmuje działania, aby je obsłużyć. Jest oddzielony od źródła wiadomości, a jego jedynym obowiązkiem jest prawidłowe rozpoznanie rodzaju otrzymanej wiadomości i podjęcie odpowiednich działań.
Po otrzymaniu wiadomości aktor może wykonać jedną lub więcej z następujących akcji:
- Samo wykonywanie niektórych operacji (takich jak wykonywanie obliczeń, utrwalanie danych, wywoływanie zewnętrznej usługi sieciowej itd.)
- Przekaż wiadomość lub wiadomość pochodną innemu aktorowi
- Utwórz instancję nowego aktora i przekaż mu wiadomość
Alternatywnie, aktor może zdecydować się na całkowite zignorowanie wiadomości (tj. może wybrać bezczynność), jeśli uzna to za stosowne.
Aby zaimplementować aktora, należy rozszerzyć cechę akka.actor.Actor i zaimplementować metodę receive. Metoda odbierania aktora jest wywoływana (przez Akka), gdy wiadomość jest wysyłana do tego aktora. Jego typowa implementacja polega na dopasowaniu wzorców, jak pokazano w poniższym przykładzie Akka, w celu zidentyfikowania typu wiadomości i odpowiedniej reakcji:
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
Dopasowywanie wzorców to stosunkowo elegancka technika obsługi komunikatów, która ma tendencję do tworzenia „czystszego” i łatwiejszego w nawigacji kodu niż porównywalna implementacja oparta na wywołaniach zwrotnych. Rozważmy na przykład uproszczoną implementację żądania/odpowiedzi HTTP.
Najpierw zaimplementujmy to za pomocą paradygmatu opartego na wywołaniach zwrotnych w JavaScript:
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
Teraz porównajmy to do implementacji opartej na dopasowaniu wzorców:
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
Chociaż kod JavaScript oparty na wywołaniu zwrotnym jest wprawdzie zwarty, z pewnością jest trudniejszy do odczytania i nawigacji. Dla porównania, kod oparty na dopasowywaniu wzorców sprawia, że od razu widać, jakie przypadki są brane pod uwagę i jak każdy z nich jest obsługiwany.
System aktorski
Branie złożonego problemu i rekurencyjne dzielenie go na mniejsze podproblemy jest ogólnie dobrą techniką rozwiązywania problemów. Takie podejście może być szczególnie korzystne w informatyce (zgodne z zasadą pojedynczej odpowiedzialności), ponieważ zwykle daje czysty, zmodularyzowany kod, z niewielką lub żadną redundancją, który jest stosunkowo łatwy w utrzymaniu.
W projekcie opartym na aktorach użycie tej techniki ułatwia logiczną organizację aktorów w hierarchiczną strukturę znaną jako system aktorów. System aktora zapewnia infrastrukturę, za pośrednictwem której aktorzy wchodzą ze sobą w interakcje.
W Akka jedynym sposobem komunikowania się z aktorem jest ActorRef
. ActorRef
reprezentuje odwołanie do aktora, który uniemożliwia innym obiektom bezpośredni dostęp lub manipulowanie elementami wewnętrznymi i stanem tego aktora. Wiadomości mogą być wysyłane do aktora za pośrednictwem ActorRef
przy użyciu jednego z następujących protokołów składniowych:
-
!
(„tell”) – wysyła wiadomość i wraca natychmiast -
?
(„ask”) – wysyła wiadomość i zwraca Future reprezentującą możliwą odpowiedź
Każdy aktor ma skrzynkę pocztową, do której dostarczane są jego wiadomości przychodzące. Istnieje wiele implementacji skrzynek pocztowych do wyboru, przy czym domyślną implementacją jest FIFO.
Aktor zawiera wiele zmiennych instancji, aby utrzymać stan podczas przetwarzania wielu komunikatów. Akka zapewnia, że każda instancja aktora działa we własnym lekkim wątku i że wiadomości są przetwarzane pojedynczo. W ten sposób stan każdego aktora może być niezawodnie utrzymywany bez konieczności wyraźnego martwienia się dewelopera o synchronizację lub warunki wyścigu.
Każdy aktor otrzymuje następujące przydatne informacje do wykonywania swoich zadań za pośrednictwem interfejsu API Akka Actor:
-
sender
:ActorRef
do nadawcy aktualnie przetwarzanej wiadomości -
context
: informacje i metody odnoszące się do kontekstu, w którym działa aktor (obejmuje na przykład metodęactorOf
do tworzenia instancji nowego aktora) -
supervisionStrategy
: określa strategię, która ma być użyta do naprawy po błędach -
self
:ActorRef
dla samego aktora
Aby pomóc w powiązaniu tych samouczków, rozważmy prosty przykład liczenia słów w pliku tekstowym.
Na potrzeby naszego przykładu Akka podzielimy problem na dwa podzadania; mianowicie (1) zadanie „dziecko” polegające na zliczeniu liczby słów w jednym wierszu i (2) zadanie „nadrzędne” polegające na zsumowaniu liczby słów w wierszu, aby uzyskać całkowitą liczbę słów w pliku.
Aktor nadrzędny załaduje każdy wiersz z pliku, a następnie przekaże aktorowi podrzędnemu zadanie liczenia słów w tym wierszu. Kiedy dziecko skończy, odeśle wiadomość z powrotem do rodzica z wynikiem. Rodzic otrzyma wiadomości z liczbą słów (dla każdej linii) i będzie utrzymywał licznik całkowitej liczby słów w całym pliku, który następnie zwróci do swojego wywołującego po zakończeniu.
(Zauważ, że przykłady kodu samouczka Akka przedstawione poniżej są przeznaczone wyłącznie do celów dydaktycznych i dlatego niekoniecznie dotyczą wszystkich warunków brzegowych, optymalizacji wydajności itd. Ponadto pełna wersja przykładów kodu pokazanych poniżej jest dostępna w ten sedno.)
Przyjrzyjmy się najpierw przykładowej implementacji podrzędnej klasy StringCounterActor
:
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
Ten aktor ma bardzo proste zadanie: konsumować wiadomości ProcessStringMsg
(zawierające wiersz tekstu), policzyć liczbę słów w określonym wierszu i zwrócić wynik do nadawcy za pomocą wiadomości StringProcessedMsg
. Zwróć uwagę, że zaimplementowaliśmy naszą klasę do używania !
(„tell”) metoda wysyłania wiadomości StringProcessedMsg
(tj. wysyłanie wiadomości i natychmiastowy powrót).
OK, teraz zwróćmy naszą uwagę na nadrzędną klasę WordCounterActor
:

1. case class StartProcessFileMsg() 2. 3. class WordCounterActor(filename: String) extends Actor { 4. 5. private var running = false 6. private var totalLines = 0 7. private var linesProcessed = 0 8. private var result = 0 9. private var fileSender: Option[ActorRef] = None 10. 11. def receive = { 12. case StartProcessFileMsg() => { 13. if (running) { 14. // println just used for example purposes; 15. // Akka logger should be used instead 16. println("Warning: duplicate start message received") 17. } else { 18. running = true 19. fileSender = Some(sender) // save reference to process invoker 20. import scala.io.Source._ 21. fromFile(filename).getLines.foreach { line => 22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) 23. totalLines += 1 24. } 25. } 26. } 27. case StringProcessedMsg(words) => { 28. result += words 29. linesProcessed += 1 30. if (linesProcessed == totalLines) { 31. fileSender.map(_ ! result) // provide result to process invoker 32. } 33. } 34. case _ => println("message not recognized!") 35. } 36. }
Wiele rzeczy się tutaj dzieje, więc przyjrzyjmy się każdemu z nich bardziej szczegółowo (zauważ, że numery wierszy, do których odwołujemy się w poniższej dyskusji, są oparte na powyższym przykładzie kodu) …
Po pierwsze, zauważ, że nazwa pliku do przetworzenia jest przekazywana do konstruktora WordCounterActor
(wiersz 3). Oznacza to, że aktor ma być używany tylko do przetwarzania pojedynczego pliku. Upraszcza to również programistom zadanie kodowania, unikając konieczności resetowania zmiennych stanu ( running
, totalLines
, linesProcessed
i result
) po zakończeniu zadania, ponieważ instancja jest używana tylko raz (tj. do przetworzenia pojedynczego pliku) a następnie odrzucone.
Następnie zwróć uwagę, że WordCounterActor
obsługuje dwa typy wiadomości:
-
StartProcessFileMsg
(wiersz 12)- Otrzymane od zewnętrznego aktora, który początkowo inicjuje
WordCounterActor
. - Po otrzymaniu
WordCounterActor
najpierw sprawdza, czy nie otrzymuje nadmiarowego żądania. - Jeśli żądanie jest nadmiarowe,
WordCounterActor
generuje ostrzeżenie i nic więcej nie jest robione (wiersz 16). - Jeśli żądanie nie jest zbędne:
-
WordCounterActor
przechowuje odniesienie do nadawcy w zmiennej instancjifileSender
(zauważ, że jest toOption[ActorRef]
, a nieOption[Actor]
— patrz wiersz 9). TenActorRef
jest potrzebny, aby później uzyskać do niego dostęp i odpowiedzieć na niego podczas przetwarzania końcowegoStringProcessedMsg
(odebranego z elementu podrzędnegoStringCounterActor
, jak opisano poniżej). - Następnie
WordCounterActor
odczytuje plik i po załadowaniu każdego wiersza w pliku tworzony jest element potomnyStringCounterActor
, do którego przekazywany jest komunikat zawierający wiersz do przetworzenia (wiersze 21-24).
-
- Otrzymane od zewnętrznego aktora, który początkowo inicjuje
-
StringProcessedMsg
(wiersz 27)- Odebrane od podrzędnego
StringCounterActor
po zakończeniu przetwarzania przypisanego do niego wiersza. - Po otrzymaniu,
WordCounterActor
zwiększa licznik linii dla pliku i, jeśli wszystkie linie w pliku zostały przetworzone (tj. gdytotalLines
ilinesProcessed
są równe), wysyła końcowy wynik do oryginalnegofileSender
(linie 28-31).
- Odebrane od podrzędnego
Po raz kolejny zauważ, że w Akce jedynym mechanizmem komunikacji między aktorami jest przekazywanie wiadomości. Wiadomości są jedyną rzeczą, którą aktorzy dzielą, a ponieważ aktorzy mogą potencjalnie uzyskiwać dostęp do tych samych wiadomości jednocześnie, ważne jest, aby były niezmienne, aby uniknąć sytuacji wyścigowych i nieoczekiwanych zachowań.
Dlatego powszechne jest przekazywanie komunikatów w postaci klas przypadków, ponieważ są one domyślnie niezmienne i ze względu na to, jak bezproblemowo integrują się z dopasowaniem wzorców.
Zakończmy przykład przykładowym kodem, aby uruchomić całą aplikację.
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
Zwróć uwagę, jak tym razem ?
Metoda służy do wysłania wiadomości. W ten sposób wywołujący może użyć zwróconej przyszłości do wydrukowania końcowego wyniku, gdy jest on dostępny, i wyjścia z programu poprzez zamknięcie ActorSystem.
Akka tolerancja błędów i strategie nadzorcy
W systemie aktorskim każdy aktor jest nadzorcą swoich dzieci. Jeśli aktor nie poradzi sobie z wiadomością, zawiesza się i wszystkie swoje dzieci i wysyła wiadomość, zwykle w formie wyjątku, do swojego przełożonego.
W Akka sposób, w jaki superwizor reaguje i obsługuje wyjątki, które przesiąkają do niego przez jego dzieci, jest określany mianem strategii superwizora. Strategie nadzorcy są podstawowym i prostym mechanizmem, za pomocą którego definiujesz zachowanie systemu odporne na awarie.
Gdy wiadomość informująca o niepowodzeniu dotrze do przełożonego, może on podjąć jedną z następujących czynności:
- Wznów dziecko (i jego dzieci), zachowując jego stan wewnętrzny. Tę strategię można zastosować, gdy stan podrzędny nie został uszkodzony przez błąd i może nadal działać poprawnie.
- Zrestartuj dziecko (i jego dzieci), czyszcząc jego stan wewnętrzny. Ta strategia może być zastosowana w scenariuszu odwrotnym do opisanego powyżej. Jeśli stan podrzędny został uszkodzony przez błąd, konieczne jest zresetowanie jego stanu, zanim będzie można go użyć w przyszłości.
- Zatrzymaj dziecko (i jego dzieci) na stałe. Ta strategia może być zastosowana w przypadkach, w których warunek błędu nie jest uważany za możliwy do naprawienia, ale nie zagraża pozostałej części wykonywanej operacji, która może zostać zakończona pod nieobecność dziecka, które nie powiodło się.
- Zatrzymaj się i eskaluj błąd. Zatrudniany, gdy przełożony nie wie, jak poradzić sobie z awarią i eskaluje ją do własnego przełożonego.
Co więcej, Aktor może zdecydować, że zastosuje akcję tylko do dzieci, którym nie powiodło się, lub też do swojego rodzeństwa. Istnieją dwie wstępnie zdefiniowane strategie:
-
OneForOneStrategy
: Stosuje określone działanie tylko do nieudanego dziecka -
AllForOneStrategy
: Stosuje określoną akcję do wszystkich swoich dzieci
Oto prosty przykład, wykorzystujący OneForOneStrategy
:
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
Jeśli nie określono żadnej strategii, stosowana jest następująca strategia domyślna:
- Jeśli wystąpił błąd podczas inicjowania aktora lub jeśli aktor został zabity, aktor zostanie zatrzymany.
- Jeśli był jakikolwiek inny wyjątek, aktor jest po prostu restartowany.
Implementacja tej domyślnej strategii dostarczona przez firmę Akka wygląda następująco:
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akka pozwala na implementację niestandardowych strategii nadzorczych, ale jak ostrzega dokumentacja Akka, należy to robić ostrożnie, ponieważ nieprawidłowe implementacje mogą prowadzić do problemów, takich jak zablokowane systemy aktorów (tj. trwale zawieszeni aktorzy).
Przejrzystość lokalizacji
Architektura Akka obsługuje przezroczystość lokalizacji, dzięki czemu aktorzy są całkowicie niezależni od tego, skąd pochodzą otrzymywane przez nich wiadomości. Nadawca komunikatu może znajdować się w tej samej wirtualnej maszynie wirtualnej co aktor lub w oddzielnej wirtualnej maszynie wirtualnej (działającej w tym samym lub innym węźle). Akka umożliwia obsługę każdego z tych przypadków w sposób całkowicie przejrzysty dla aktora (a tym samym dla programisty). Jedynym zastrzeżeniem jest to, że wiadomości wysyłane przez wiele węzłów muszą być możliwe do serializacji.
Systemy aktora są zaprojektowane do działania w środowisku rozproszonym bez konieczności stosowania specjalistycznego kodu. Akka wymaga jedynie obecności pliku konfiguracyjnego ( application.conf
), który określa węzły, do których mają być wysyłane wiadomości. Oto prosty przykład pliku konfiguracyjnego:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
Kilka wskazówek na temat rozstania…
Widzieliśmy, jak framework Akka pomaga osiągnąć współbieżność i wysoką wydajność. Jednak, jak wskazano w tym samouczku, należy pamiętać o kilku punktach podczas projektowania i wdrażania systemu, aby w pełni wykorzystać moc Akka:
- W możliwie największym stopniu każdemu aktorowi należy przydzielić możliwie najmniejsze zadanie (jak omówiono wcześniej, zgodnie z zasadą pojedynczej odpowiedzialności)
Aktorzy powinni obsługiwać zdarzenia (tj. przetwarzać komunikaty) asynchronicznie i nie powinny blokować, w przeciwnym razie nastąpią zmiany kontekstu, co może niekorzystnie wpłynąć na wydajność. W szczególności najlepiej wykonywać operacje blokujące (IO, itp.) w przyszłości, aby nie blokować aktora; tj:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- Upewnij się, że wszystkie wiadomości są niezmienne, ponieważ aktorzy, którzy je do siebie przekazują, będą działać jednocześnie we własnych wątkach. Zmienne komunikaty mogą powodować nieoczekiwane zachowanie.
- Ponieważ komunikaty wysyłane między węzłami muszą być możliwe do serializacji, należy pamiętać, że im większe są komunikaty, tym dłużej zajmie ich serializacja, wysłanie i deserializacja, co może negatywnie wpłynąć na wydajność.
Wniosek
Akka, napisana w Scali, upraszcza i ułatwia tworzenie aplikacji o wysokiej współbieżności, rozproszonych i odpornych na błędy, ukrywając większość złożoności przed programistą. Oddanie Akka pełnej sprawiedliwości wymagałoby znacznie więcej niż ten pojedynczy samouczek, ale mam nadzieję, że ten wstęp i jego przykłady były wystarczająco wciągające, abyś chciał przeczytać więcej.
Amazon, VMWare i CSC to tylko kilka przykładów wiodących firm, które aktywnie korzystają z Akka. Odwiedź oficjalną stronę Akka, aby dowiedzieć się więcej i sprawdzić, czy Akka może być również właściwą odpowiedzią dla Twojego projektu.