并发和容错变得简单:带有示例的 Akka 教程
已发表: 2022-03-11挑战
编写并发程序很困难。 必须处理线程、锁、竞争条件等非常容易出错,并且可能导致代码难以阅读、测试和维护。
因此,许多人更愿意完全避免多线程。 相反,它们专门使用单线程进程,依靠外部服务(如数据库、队列等)来处理任何需要的并发或异步操作。 虽然这种方法在某些情况下是一种合法的选择,但在许多情况下它根本不是一个可行的选择。 许多实时系统——例如交易或银行应用程序,或实时游戏——没有等待单线程进程完成的奢侈(他们现在需要答案!)。 其他系统是如此的计算或资源密集型,以至于在不将并行化引入其代码的情况下,它们将花费大量时间(在某些情况下为数小时甚至数天)才能运行。
一种相当常见的单线程方法(例如,在 Node.js 世界中广泛使用)是使用基于事件的非阻塞范例。 虽然这确实通过避免上下文切换、锁定和阻塞来提高性能,但它仍然没有解决同时使用多个处理器的问题(这样做需要启动多个独立进程并在多个独立进程之间进行协调)。
那么这是否意味着您别无选择,只能深入了解线程、锁和竞争条件的内部以构建并发应用程序?
感谢 Akka 框架,答案是否定的。 本教程介绍了 Akka 示例,并探讨了它促进和简化并发分布式应用程序实现的方式。
什么是 Akka 框架?
Akka 是一个工具包和运行时,用于在 JVM 上构建高度并发、分布式和容错的应用程序。 Akka 是用 Scala 编写的,为 Scala 和 Java 提供了语言绑定。
Akka 处理并发的方法基于 Actor 模型。 在基于参与者的系统中,一切都是参与者,就像在面向对象设计中一切都是对象一样。 然而,一个关键的区别——与我们的讨论特别相关——是 Actor 模型是专门设计和架构为用作并发模型的,而面向对象模型则不是。 更具体地说,在 Scala 演员系统中,演员交互和共享信息,没有任何顺序性的预设。 参与者彼此共享信息和任务彼此共享的机制是消息传递。
Akka 在参与者和底层系统之间创建了一个层,以便参与者只需要处理消息。 创建和调度线程、接收和分派消息以及处理竞争条件和同步的所有复杂性都归于框架以透明地处理。
Akka 严格遵守反应式宣言。 反应式应用程序旨在用满足以下一项或多项要求的架构替换传统的多线程应用程序:
- 事件驱动。 使用 Actors,可以编写异步处理请求并专门使用非阻塞操作的代码。
- 可扩展。 在 Akka 中,由于消息传递和位置透明性,无需修改代码即可添加节点。
- 有弹性的。 任何应用程序都会在某个时间点遇到错误并失败。 Akka 提供“监督”(容错)策略来促进自我修复系统。
- 反应灵敏。 当今的许多高性能和快速响应应用程序都需要向用户提供快速反馈,因此需要以极其及时的方式对事件做出反应。 Akka 的非阻塞、基于消息的策略有助于实现这一点。
什么是 Akka 中的演员?
演员本质上只不过是一个接收消息并采取行动处理消息的对象。 它与消息的来源分离,它唯一的职责是正确识别它收到的消息类型并采取相应的措施。
收到消息后,参与者可能会采取以下一项或多项行动:
- 自己执行一些操作(例如执行计算、持久化数据、调用外部 Web 服务等)
- 将消息或派生消息转发给另一个参与者
- 实例化一个新的actor并将消息转发给它
或者,参与者可以选择完全忽略该消息(即,它可以选择不作为),如果它认为这样做是合适的。
要实现一个actor,需要扩展akka.actor.Actor trait 并实现receive 方法。 当一个消息被发送到那个actor 时,一个actor 的receive 方法被调用(由Akka)。 它的典型实现包括模式匹配,如下面的 Akka 示例所示,以识别消息类型并做出相应的反应:
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
模式匹配是一种相对优雅的消息处理技术,与基于回调的类似实现相比,它倾向于生成“更干净”且更易于导航的代码。 例如,考虑一个简单的 HTTP 请求/响应实现。
首先,让我们在 JavaScript 中使用基于回调的范例来实现它:
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
现在让我们将其与基于模式匹配的实现进行比较:
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
虽然基于回调的 JavaScript 代码确实很紧凑,但它肯定更难阅读和导航。 相比之下,基于模式匹配的代码可以更直接地看出正在考虑哪些案例以及如何处理每个案例。
演员系统
将一个复杂的问题递归地拆分为更小的子问题通常是一种合理的问题解决技术。 这种方法在计算机科学中特别有用(与单一职责原则一致),因为它倾向于产生干净、模块化的代码,几乎没有冗余,而且相对容易维护。
在基于角色的设计中,使用此技术有助于将角色的逻辑组织成称为角色系统的层次结构。 参与者系统提供了参与者之间交互的基础设施。
在 Akka 中,与 Actor 通信的唯一方法是通过ActorRef
。 ActorRef
表示对一个actor的引用,它阻止其他对象直接访问或操作该actor的内部和状态。 可以使用以下语法协议之一通过ActorRef
将消息发送给参与者:
-
!
(“tell”) – 发送消息并立即返回 ?
(“ask”) – 发送消息并返回代表可能回复的 Future
每个参与者都有一个邮箱,它的传入消息被传递到该邮箱。 有多种邮箱实现可供选择,默认实现是 FIFO。
一个actor包含许多实例变量以在处理多个消息时保持状态。 Akka 确保参与者的每个实例都在其自己的轻量级线程中运行,并且一次处理一个消息。 通过这种方式,可以可靠地维护每个参与者的状态,而无需开发人员明确担心同步或竞争条件。
通过 Akka Actor API 为每个 Actor 提供了以下有用信息,用于执行其任务:
-
sender
: 当前正在处理的消息的发送者的ActorRef
-
context
:与actor正在运行的上下文相关的信息和方法(例如,包括用于实例化新actor的actorOf
方法) -
supervisionStrategy
策略:定义用于从错误中恢复的策略 self
: 演员本身的ActorRef
为了帮助将这些教程联系在一起,让我们考虑一个计算文本文件中单词数量的简单示例。
对于我们的 Akka 示例,我们将把问题分解为两个子任务; 即,(1)计算单行字数的“子”任务和(2)将每行字数相加以获得文件中总字数的“父”任务。
父actor将从文件中加载每一行,然后将计算该行中单词的任务委托给子actor。 当孩子完成后,它将向父母发送一条带有结果的消息。 父级将接收带有字数(每行)的消息,并为整个文件中的总字数保留一个计数器,然后在完成后将其返回给其调用者。
(请注意,下面提供的 Akka 教程代码示例仅用于教学,因此不一定关注所有边缘条件、性能优化等。此外,下面显示的代码示例的完整可编译版本可在这个要点。)
我们先来看一个StringCounterActor
子类的示例实现:
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
这个actor有一个非常简单的任务:消费ProcessStringMsg
消息(包含一行文本),计算指定行的单词数,并通过StringProcessedMsg
消息将结果返回给发送者。 请注意,我们已经实现了我们的类以使用!
(“tell”) 方法发送StringProcessedMsg
消息(即发送消息并立即返回)。
好的,现在让我们把注意力转向父WordCounterActor
类:
1. case class StartProcessFileMsg() 2. 3. class WordCounterActor(filename: String) extends Actor { 4. 5. private var running = false 6. private var totalLines = 0 7. private var linesProcessed = 0 8. private var result = 0 9. private var fileSender: Option[ActorRef] = None 10. 11. def receive = { 12. case StartProcessFileMsg() => { 13. if (running) { 14. // println just used for example purposes; 15. // Akka logger should be used instead 16. println("Warning: duplicate start message received") 17. } else { 18. running = true 19. fileSender = Some(sender) // save reference to process invoker 20. import scala.io.Source._ 21. fromFile(filename).getLines.foreach { line => 22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) 23. totalLines += 1 24. } 25. } 26. } 27. case StringProcessedMsg(words) => { 28. result += words 29. linesProcessed += 1 30. if (linesProcessed == totalLines) { 31. fileSender.map(_ ! result) // provide result to process invoker 32. } 33. } 34. case _ => println("message not recognized!") 35. } 36. }
这里发生了很多事情,所以让我们更详细地检查它们中的每一个(请注意,以下讨论中引用的行号基于上面的代码示例) ......

首先,请注意要处理的文件的名称被传递给WordCounterActor
构造函数(第 3 行)。 这表明actor仅用于处理单个文件。 这也简化了开发人员的编码工作,避免了在工作完成时重置状态变量( running
、 totalLines
、 linesProcessed
和result
)的需要,因为实例只使用一次(即处理单个文件)然后丢弃。
接下来,观察WordCounterActor
处理两种类型的消息:
-
StartProcessFileMsg
(第 12 行)- 从最初启动
WordCounterActor
的外部参与者接收。 - 收到后,
WordCounterActor
首先检查它是否没有收到冗余请求。 - 如果请求是多余的,
WordCounterActor
会生成一个警告并且什么都不做(第 16 行)。 - 如果请求不是多余的:
-
WordCounterActor
将对发送者的引用存储在fileSender
实例变量中(请注意,这是一个Option[ActorRef]
而不是Option[Actor]
- 参见第 9 行)。 需要此ActorRef
以便稍后在处理最终StringProcessedMsg
(从StringCounterActor
子级接收,如下所述)时访问和响应它。 - 然后
WordCounterActor
读取文件,并且在加载文件中的每一行时,创建一个StringCounterActor
子代,并将包含要处理的行的消息传递给它(第 21-24 行)。
-
- 从最初启动
-
StringProcessedMsg
(第 27 行)- 当子 StringCounterActor 完成处理分配给它的行时从子
StringCounterActor
接收。 - 收到后,
WordCounterActor
增加文件的行计数器,如果文件中的所有行都已处理(即,当totalLines
和linesProcessed
相等时),它将最终结果发送到原始fileSender
(第 28-31 行)。
- 当子 StringCounterActor 完成处理分配给它的行时从子
再次注意,在 Akka 中,actor 间通信的唯一机制是消息传递。 消息是参与者共享的唯一内容,由于参与者可能同时访问相同的消息,因此它们是不可变的很重要,以避免竞争条件和意外行为。
因此,通常以案例类的形式传递消息,因为它们在默认情况下是不可变的,并且它们与模式匹配的无缝集成。
让我们用运行整个应用程序的代码示例来结束该示例。
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
注意这次如何?
方法用于发送消息。 这样,调用者可以使用返回的 Future 在可用时打印最终结果,并通过关闭 ActorSystem 退出程序。
Akka 容错和主管策略
在演员系统中,每个演员都是其孩子的监督者。 如果一个actor无法处理一条消息,它会挂起自己和它的所有子节点,并通常以异常的形式向它的主管发送一条消息。
在 Akka 中,主管响应和处理从其子级渗透到它的异常的方式称为主管策略。 主管策略是定义系统容错行为的主要且直接的机制。
当表示失败的消息到达主管时,它可以采取以下操作之一:
- 恢复孩子(及其孩子),保持其内部状态。 当子状态没有被错误破坏并且可以继续正常运行时,可以应用此策略。
- 重新启动孩子(及其孩子),清除其内部状态。 此策略可用于与刚刚描述的相反的情况。 如果子状态已被错误破坏,则必须先重置其状态,然后才能在未来使用。
- 永久停止孩子(及其孩子)。 这种策略可以在错误条件被认为是不可纠正的情况下使用,但不会危及正在执行的其余操作,这可以在没有失败的孩子的情况下完成。
- 停止自身并升级错误。 当主管不知道如何处理故障并因此将其上报给自己的主管时受雇。
此外,Actor 可以决定仅将动作应用于失败的孩子或其兄弟姐妹。 为此有两种预定义的策略:
-
OneForOneStrategy
:仅将指定的操作应用于失败的孩子 AllForOneStrategy
:将指定的操作应用于其所有子项
这是一个使用OneForOneStrategy
的简单示例:
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
如果未指定策略,则采用以下默认策略:
- 如果在初始化 Actor 时出错或者如果 Actor 被杀死,则 Actor 将停止。
- 如果有任何其他类型的异常,则简单地重新启动actor。
这个默认策略的 Akka 提供的实现如下:
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akka 允许实施自定义主管策略,但正如 Akka 文档所警告的那样,请谨慎执行,因为不正确的实施可能会导致诸如阻塞 Actor 系统(即永久暂停 Actor)之类的问题。
位置透明度
Akka 架构支持位置透明性,使参与者能够完全不知道他们收到的消息的来源。 消息的发送者可能与参与者驻留在同一 JVM 中,也可能驻留在单独的 JVM 中(运行在同一节点或不同节点上)。 Akka 能够以对参与者(以及开发人员)完全透明的方式处理这些案例中的每一个。 唯一需要注意的是,跨多个节点发送的消息必须是可序列化的。
Actor 系统被设计为在分布式环境中运行,而不需要任何专门的代码。 Akka 只需要存在一个配置文件( application.conf
)来指定要向其发送消息的节点。 下面是一个简单的配置文件示例:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
一些离别技巧……
我们已经看到了 Akka 框架如何帮助实现并发和高性能。 然而,正如本教程所指出的,在设计和实现系统时要牢记以下几点,以便充分利用 Akka 的力量:
- 尽可能为每个参与者分配尽可能少的任务(如前所述,遵循单一职责原则)
Actor 应该异步处理事件(即处理消息)并且不应该阻塞,否则会发生上下文切换,这会对性能产生不利影响。 具体来说,最好在Future中执行阻塞操作(IO等),以免阻塞actor; IE:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- 确保您的消息都是不可变的,因为将它们传递给彼此的参与者都将在它们自己的线程中同时运行。 可变消息可能会导致意外行为。
- 由于节点之间发送的消息必须是可序列化的,因此重要的是要记住消息越大,序列化、发送和反序列化它们所需的时间就越长,这会对性能产生负面影响。
结论
用 Scala 编写的 Akka 简化并促进了高并发、分布式和容错应用程序的开发,对开发人员隐藏了大部分复杂性。 做 Akka 完全公正需要的远不止这个单一的教程,但希望这个介绍和它的例子足够迷人,让你想要阅读更多。
Amazon、VMWare 和 CSC 只是积极使用 Akka 的领先公司的几个例子。 访问 Akka 官方网站以了解更多信息并探索 Akka 是否也可以成为您项目的正确答案。