Параллелизм и отказоустойчивость стали проще: учебник по Akka с примерами
Опубликовано: 2022-03-11Соревнование
Написание параллельных программ сложно. Необходимость иметь дело с потоками, блокировками, условиями гонки и т. д. очень подвержена ошибкам и может привести к тому, что код будет трудно читать, тестировать и поддерживать.
Поэтому многие предпочитают вообще избегать многопоточности. Вместо этого они используют исключительно однопоточные процессы, полагаясь на внешние службы (такие как базы данных, очереди и т. д.) для обработки любых необходимых параллельных или асинхронных операций. Хотя этот подход в некоторых случаях является допустимой альтернативой, во многих случаях он просто неприемлем. Многие системы реального времени, такие как торговые или банковские приложения или игры в реальном времени, не могут позволить себе роскошь ожидания завершения однопоточного процесса (им нужен ответ прямо сейчас!). Другие системы настолько требовательны к вычислениям или ресурсам, что им потребовалось бы чрезмерное количество времени (часы или даже дни в некоторых случаях) для запуска без внедрения параллелизма в их код.
Один довольно распространенный однопоточный подход (широко используемый, например, в мире Node.js) заключается в использовании неблокирующей парадигмы, основанной на событиях. Хотя это действительно повышает производительность за счет исключения переключения контекста, блокировок и блокировок, оно по-прежнему не решает проблемы одновременного использования нескольких процессоров (для этого потребуется запуск и координация между несколькими независимыми процессами).
Значит ли это, что у вас нет другого выбора, кроме как углубиться в недра потоков, блокировок и условий гонки, чтобы создать параллельное приложение?
Благодаря фреймворку Akka ответ — нет. В этом учебном пособии представлены примеры Akka и рассмотрены способы облегчения и упрощения реализации параллельных распределенных приложений.
Что такое Акка Фреймворк?
Akka — это набор инструментов и среда выполнения для создания высокопараллельных, распределенных и отказоустойчивых приложений на JVM. Akka написана на Scala с языковыми привязками как для Scala, так и для Java.
Подход Akka к обработке параллелизма основан на акторной модели. В акторной системе все является актором, почти так же, как все является объектом в объектно-ориентированном дизайне. Однако ключевое отличие, особенно важное для нашего обсуждения, заключается в том, что акторная модель была специально разработана и спроектирована для использования в качестве параллельной модели, а объектно-ориентированная модель — нет. В частности, в системе акторов Scala акторы взаимодействуют и обмениваются информацией без каких-либо предположений о последовательности. Механизм, с помощью которого акторы обмениваются информацией друг с другом и ставят друг другу задачи, — это передача сообщений.
Akka создает слой между акторами и базовой системой, так что акторам просто нужно обрабатывать сообщения. Вся сложность создания и планирования потоков, получения и отправки сообщений, а также обработки условий гонки и синхронизации переносится на платформу для прозрачной обработки.
Акка строго придерживается Реактивного манифеста. Реактивные приложения нацелены на замену традиционных многопоточных приложений архитектурой, удовлетворяющей одному или нескольким из следующих требований:
- Событийный. Используя Актеры, можно написать код, который обрабатывает запросы асинхронно и использует исключительно неблокирующие операции.
- Масштабируемость. В Akka возможно добавление узлов без изменения кода благодаря передаче сообщений и прозрачности местоположения.
- Устойчивый. Любое приложение будет сталкиваться с ошибками и в какой-то момент выйдет из строя. Akka предоставляет стратегии «надзора» (отказоустойчивости) для облегчения самовосстановления системы.
- Отзывчивый. Многие из современных высокопроизводительных и быстродействующих приложений должны обеспечивать быструю обратную связь с пользователем и, следовательно, должны реагировать на события чрезвычайно своевременно. Неблокирующая стратегия Akka, основанная на сообщениях, помогает достичь этого.
Что такое Актер в Акке?
Актер — это не что иное, как объект, который получает сообщения и выполняет действия по их обработке. Он отделен от источника сообщения, и его единственная обязанность — правильно распознать тип полученного сообщения и принять соответствующие меры.
Получив сообщение, актор может предпринять одно или несколько из следующих действий:
- Самостоятельно выполнять некоторые операции (такие как выполнение вычислений, сохранение данных, вызов внешней веб-службы и т. д.)
- Переслать сообщение или производное сообщение другому субъекту
- Создайте экземпляр нового актера и перешлите ему сообщение.
В качестве альтернативы актор может решить полностью игнорировать сообщение (т. е. он может выбрать бездействие), если сочтет это целесообразным.
Для реализации актора необходимо расширить трейт akka.actor.Actor и реализовать метод получения. Метод получения актора вызывается (Akka) при отправке сообщения этому актеру. Его типичная реализация состоит из сопоставления с образцом, как показано в следующем примере Akka, для определения типа сообщения и соответствующей реакции:
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
Сопоставление с образцом — это относительно элегантный метод обработки сообщений, который имеет тенденцию создавать более «чистый» и удобный для навигации код, чем аналогичная реализация, основанная на обратных вызовах. Рассмотрим, например, упрощенную реализацию HTTP-запроса/ответа.
Во-первых, давайте реализуем это, используя парадигму обратного вызова в JavaScript:
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
Теперь давайте сравним это с реализацией на основе сопоставления с образцом:
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
Хотя код JavaScript на основе обратных вызовов, по общему признанию, компактен, его, безусловно, сложнее читать и ориентироваться. Для сравнения, код, основанный на сопоставлении с образцом, делает более очевидным, какие случаи рассматриваются и как каждый из них обрабатывается.
Актерская система
Взять сложную проблему и рекурсивно разбить ее на более мелкие подзадачи — это в целом хороший метод решения проблем. Этот подход может быть особенно полезен в компьютерных науках (в соответствии с принципом единой ответственности), поскольку он имеет тенденцию давать чистый, модульный код с небольшой избыточностью или без нее, который относительно легко поддерживать.
В дизайне на основе акторов использование этой техники облегчает логическую организацию акторов в иерархическую структуру, известную как акторная система. Система акторов обеспечивает инфраструктуру, посредством которой акторы взаимодействуют друг с другом.
В Akka единственный способ связи с актером — через ActorRef
. ActorRef
представляет собой ссылку на актор, которая не позволяет другим объектам напрямую обращаться или манипулировать внутренними элементами и состоянием этого актора. Сообщения могут быть отправлены актору через ActorRef
с использованием одного из следующих протоколов синтаксиса:
-
!
(«рассказать») – отправляет сообщение и немедленно возвращается -
?
(«спросить») — отправляет сообщение и возвращает Future, представляющий возможный ответ
У каждого актора есть почтовый ящик, в который доставляются входящие сообщения. Существует несколько реализаций почтовых ящиков, из которых можно выбрать, по умолчанию используется FIFO.
Актер содержит множество переменных экземпляра для сохранения состояния при обработке нескольких сообщений. Akka гарантирует, что каждый экземпляр актора запускается в своем собственном облегченном потоке, а сообщения обрабатываются по одному. Таким образом, состояние каждого актора может надежно поддерживаться без необходимости явного беспокойства разработчика о синхронизации или условиях гонки.
Каждому актору предоставляется следующая полезная информация для выполнения его задач через Akka Actor API:
-
sender
:ActorRef
для отправителя сообщения, которое обрабатывается в данный момент. -
context
: информация и методы, относящиеся к контексту, в котором работает актор (включает, например, методactorOf
для создания экземпляра нового актора) -
supervisionStrategy
: определяет стратегию, которая будет использоваться для восстановления после ошибок. -
self
:ActorRef
для самого актера
Чтобы помочь связать эти руководства вместе, давайте рассмотрим простой пример подсчета количества слов в текстовом файле.
Для нашего примера с Akka мы разобьем проблему на две подзадачи; а именно (1) «дочерняя» задача подсчета количества слов в одной строке и (2) «родительская» задача суммирования этих подсчетов слов в строке для получения общего количества слов в файле.
Родительский актор загружает каждую строку из файла, а затем делегирует дочернему актору задачу подсчета слов в этой строке. Когда дочерний процесс будет завершен, он отправит сообщение обратно родителю с результатом. Родитель будет получать сообщения с количеством слов (для каждой строки) и вести счетчик общего количества слов во всем файле, который он затем вернет вызывающей стороне после завершения.
(Обратите внимание, что приведенные ниже примеры кода учебного пособия Akka предназначены только для учебных целей и поэтому не обязательно касаются всех граничных условий, оптимизации производительности и т. д. Кроме того, полная компилируемая версия примеров кода, показанных ниже, доступна в это суть.)
Давайте сначала рассмотрим пример реализации дочернего класса StringCounterActor
:
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
У этого актора очень простая задача: потреблять сообщения ProcessStringMsg
(содержащие строку текста), подсчитывать количество слов в указанной строке и возвращать результат отправителю через сообщение StringProcessedMsg
. Обратите внимание, что мы реализовали наш класс для использования !
(«сообщить») для отправки сообщения StringProcessedMsg
(т. е. для отправки сообщения и немедленного возврата).
Хорошо, теперь давайте обратим внимание на родительский класс WordCounterActor
:

1. case class StartProcessFileMsg() 2. 3. class WordCounterActor(filename: String) extends Actor { 4. 5. private var running = false 6. private var totalLines = 0 7. private var linesProcessed = 0 8. private var result = 0 9. private var fileSender: Option[ActorRef] = None 10. 11. def receive = { 12. case StartProcessFileMsg() => { 13. if (running) { 14. // println just used for example purposes; 15. // Akka logger should be used instead 16. println("Warning: duplicate start message received") 17. } else { 18. running = true 19. fileSender = Some(sender) // save reference to process invoker 20. import scala.io.Source._ 21. fromFile(filename).getLines.foreach { line => 22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) 23. totalLines += 1 24. } 25. } 26. } 27. case StringProcessedMsg(words) => { 28. result += words 29. linesProcessed += 1 30. if (linesProcessed == totalLines) { 31. fileSender.map(_ ! result) // provide result to process invoker 32. } 33. } 34. case _ => println("message not recognized!") 35. } 36. }
Здесь происходит много вещей, поэтому давайте рассмотрим каждую из них более подробно (обратите внимание, что номера строк, упомянутые в последующем обсуждении, основаны на приведенном выше примере кода) …
Во-первых, обратите внимание, что имя обрабатываемого файла передается конструктору WordCounterActor
(строка 3). Это указывает на то, что актор должен использоваться только для обработки одного файла. Это также упрощает работу по написанию кода для разработчика, избегая необходимости сбрасывать переменные состояния ( running
, totalLines
, linesProcessed
и result
) по завершении работы, поскольку экземпляр используется только один раз (т. е. для обработки одного файла). а потом выбросили.
Далее обратите внимание, что WordCounterActor
обрабатывает два типа сообщений:
-
StartProcessFileMsg
(строка 12)- Получено от внешнего субъекта, который изначально инициирует
WordCounterActor
. - При получении
WordCounterActor
сначала проверяет, не получает ли он избыточный запрос. - Если запрос избыточен,
WordCounterActor
генерирует предупреждение и больше ничего не делает (строка 16). - Если запрос не является избыточным:
-
WordCounterActor
сохраняет ссылку на отправителя в переменной экземпляраfileSender
(обратите внимание, что этоOption[ActorRef]
, а неOption[Actor]
— см. строку 9). ЭтотActorRef
необходим для последующего доступа к нему и ответа на него при обработке окончательногоStringProcessedMsg
(который получен от дочернего объектаStringCounterActor
, как описано ниже). -
WordCounterActor
читает файл, и по мере загрузки каждой строки в файле создается дочерний объектStringCounterActor
, которому передается сообщение, содержащее строку для обработки (строки 21–24).
-
- Получено от внешнего субъекта, который изначально инициирует
-
StringProcessedMsg
(строка 27)- Получается от дочернего
StringCounterActor
, когда он завершает обработку назначенной ему строки. - При получении
WordCounterActor
увеличивает счетчик строк для файла и, если все строки в файле были обработаны (т. е. когдаtotalLines
иlinesProcessed
равны), отправляет окончательный результат исходномуfileSender
(строки 28–31).
- Получается от дочернего
Еще раз обратите внимание, что в Akka единственным механизмом межакторного взаимодействия является передача сообщений. Сообщения — это единственное, чем обмениваются субъекты, и, поскольку субъекты потенциально могут одновременно получать доступ к одним и тем же сообщениям, важно, чтобы они были неизменяемыми, чтобы избежать условий гонки и неожиданного поведения.
Поэтому сообщения обычно передаются в виде классов case, поскольку они неизменяемы по умолчанию и легко интегрируются с сопоставлением с образцом.
Давайте завершим пример примером кода для запуска всего приложения.
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
Обратите внимание, как на этот раз ?
Метод используется для отправки сообщения. Таким образом, вызывающий объект может использовать возвращенное Future для печати окончательного результата, когда он будет доступен, и для выхода из программы, выключив ActorSystem.
Отказоустойчивость Akka и стратегии супервизора
В акторной системе каждый актор является наблюдателем своих потомков. Если актор не может обработать сообщение, он приостанавливает себя и всех своих дочерних элементов и отправляет сообщение, обычно в виде исключения, своему супервизору.
В Akka способ, которым супервизор реагирует и обрабатывает исключения, которые просачиваются к нему от его дочерних элементов, называется стратегией супервизора. Стратегии супервизора — это основной и простой механизм, с помощью которого вы определяете отказоустойчивое поведение вашей системы.
Когда сообщение о сбое достигает диспетчера, он может предпринять одно из следующих действий:
- Возобновить дочерний элемент (и его дочерние элементы), сохраняя его внутреннее состояние. Эту стратегию можно применять, когда дочернее состояние не было повреждено ошибкой и может продолжать функционировать корректно.
- Перезапустите дочерний элемент (и его дочерние элементы), очистив его внутреннее состояние. Эту стратегию можно использовать в сценарии, противоположном только что описанному. Если дочернее состояние было повреждено ошибкой, необходимо сбросить его состояние, прежде чем его можно будет использовать в будущем.
- Остановите дочерний элемент (и его дочерние элементы) навсегда. Эту стратегию можно использовать в тех случаях, когда считается, что состояние ошибки нельзя исправить, но оно не ставит под угрозу остальную часть выполняемой операции, которая может быть завершена в отсутствие отказавшего дочернего элемента.
- Остановите себя и увеличьте ошибку. Используется, когда супервизор не знает, как справиться с ошибкой, и поэтому передает ее своему супервизору.
Кроме того, Актер может принять решение применить действие только к неудавшимся детям или к своим братьям и сестрам. Для этого есть две заранее определенные стратегии:
-
OneForOneStrategy
: применяет указанное действие только к неудавшемуся дочернему элементу. -
AllForOneStrategy
: применяет указанное действие ко всем своим дочерним элементам.
Вот простой пример с использованием OneForOneStrategy
:
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
Если стратегия не указана, используется следующая стратегия по умолчанию:
- Если произошла ошибка при инициализации актора или если актор был убит, актор останавливается.
- Если возникло какое-либо другое исключение, актор просто перезапускается.
Поставляемая Akka реализация этой стратегии по умолчанию выглядит следующим образом:
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akka позволяет реализовать пользовательские стратегии супервизора, но, как предупреждает документация Akka, делайте это с осторожностью, так как неправильная реализация может привести к таким проблемам, как заблокированные системы акторов (т.е. перманентно приостановленные акторы).
Прозрачность местоположения
Архитектура Akka поддерживает прозрачность местоположения, что позволяет субъектам быть полностью независимыми от того, откуда поступили сообщения, которые они получают. Отправитель сообщения может находиться в той же JVM, что и действующее лицо, или в отдельной JVM (либо работающей на том же узле, либо на другом узле). Akka позволяет обрабатывать каждый из этих случаев способом, который полностью прозрачен для актора (и, следовательно, для разработчика). Единственное предостережение заключается в том, что сообщения, отправляемые через несколько узлов, должны быть сериализуемыми.
Акторные системы предназначены для работы в распределенной среде, не требуя специального кода. Akka требует только наличие конфигурационного файла ( application.conf
), в котором указаны узлы для отправки сообщений. Вот простой пример файла конфигурации:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
Несколько советов на прощание…
Мы видели, как среда Akka помогает достичь параллелизма и высокой производительности. Однако, как указано в этом руководстве, есть несколько моментов, которые следует учитывать при проектировании и реализации вашей системы, чтобы максимально использовать возможности Akka:
- В максимально возможной степени каждому участнику должна быть назначена наименьшая возможная задача (как обсуждалось ранее, в соответствии с принципом единой ответственности).
Актеры должны обрабатывать события (т. е. обрабатывать сообщения) асинхронно и не должны блокироваться, иначе произойдет переключение контекста, что может отрицательно сказаться на производительности. В частности, операции блокировки (IO и т. д.) лучше всего выполнять в Future, чтобы не блокировать актора; то есть:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- Убедитесь, что все ваши сообщения неизменяемы, так как акторы, передающие их друг другу, будут работать одновременно в своих собственных потоках. Изменяемые сообщения могут привести к неожиданному поведению.
- Поскольку сообщения, отправляемые между узлами, должны быть сериализуемыми, важно помнить, что чем больше сообщения, тем больше времени потребуется на их сериализацию, отправку и десериализацию, что может негативно сказаться на производительности.
Заключение
Akka, написанная на Scala, упрощает и облегчает разработку высокопараллельных, распределенных и отказоустойчивых приложений, скрывая большую часть сложности от разработчика. Чтобы отдать должное Akka, потребуется гораздо больше, чем это единственное руководство, но мы надеемся, что это введение и его примеры были достаточно увлекательными, чтобы заставить вас хотеть читать больше.
Amazon, VMWare и CSC — это лишь несколько примеров ведущих компаний, активно использующих Akka. Посетите официальный веб-сайт Akka, чтобы узнать больше и выяснить, может ли Akka быть правильным решением для вашего проекта.