並發和容錯變得簡單:帶有示例的 Akka 教程
已發表: 2022-03-11挑戰
編寫並發程序很困難。 必須處理線程、鎖、競爭條件等非常容易出錯,並且可能導致代碼難以閱讀、測試和維護。
因此,許多人更願意完全避免多線程。 相反,它們專門使用單線程進程,依靠外部服務(如數據庫、隊列等)來處理任何需要的並發或異步操作。 雖然這種方法在某些情況下是一種合法的選擇,但在許多情況下它根本不是一個可行的選擇。 許多實時系統——例如交易或銀行應用程序,或實時遊戲——沒有等待單線程進程完成的奢侈(他們現在需要答案!)。 其他系統是如此的計算或資源密集型,以至於在不將並行化引入其代碼的情況下,它們將花費大量時間(在某些情況下為數小時甚至數天)才能運行。
一種相當常見的單線程方法(例如,在 Node.js 世界中廣泛使用)是使用基於事件的非阻塞範例。 雖然這確實通過避免上下文切換、鎖定和阻塞來提高性能,但它仍然沒有解決同時使用多個處理器的問題(這樣做需要啟動多個獨立進程並在多個獨立進程之間進行協調)。
那麼這是否意味著您別無選擇,只能深入了解線程、鎖和競爭條件的內部以構建並發應用程序?
感謝 Akka 框架,答案是否定的。 本教程介紹了 Akka 示例,並探討了它促進和簡化並發分佈式應用程序實現的方式。
什麼是 Akka 框架?
Akka 是一個工具包和運行時,用於在 JVM 上構建高度並發、分佈式和容錯的應用程序。 Akka 是用 Scala 編寫的,為 Scala 和 Java 提供了語言綁定。
Akka 處理並發的方法基於 Actor 模型。 在基於參與者的系統中,一切都是參與者,就像在面向對象設計中一切都是對像一樣。 然而,一個關鍵的區別——與我們的討論特別相關——是 Actor 模型是專門設計和架構為用作並發模型的,而面向對像模型則不是。 更具體地說,在 Scala 演員系統中,演員交互和共享信息,沒有任何順序性的預設。 參與者彼此共享信息和任務彼此共享的機制是消息傳遞。
Akka 在參與者和底層系統之間創建了一個層,以便參與者只需要處理消息。 創建和調度線程、接收和分派消息以及處理競爭條件和同步的所有復雜性都歸於框架以透明地處理。
Akka 嚴格遵守反應式宣言。 反應式應用程序旨在用滿足以下一項或多項要求的架構替換傳統的多線程應用程序:
- 事件驅動。 使用 Actors,可以編寫異步處理請求並專門使用非阻塞操作的代碼。
- 可擴展。 在 Akka 中,由於消息傳遞和位置透明性,無需修改代碼即可添加節點。
- 有彈性的。 任何應用程序都會在某個時間點遇到錯誤並失敗。 Akka 提供“監督”(容錯)策略來促進自我修復系統。
- 反應靈敏。 當今的許多高性能和快速響應應用程序都需要向用戶提供快速反饋,因此需要以極其及時的方式對事件做出反應。 Akka 的非阻塞、基於消息的策略有助於實現這一點。
什麼是 Akka 中的演員?
演員本質上只不過是一個接收消息並採取行動處理消息的對象。 它與消息的來源分離,它唯一的職責是正確識別它收到的消息類型並採取相應的措施。
收到消息後,參與者可能會採取以下一項或多項行動:
- 自己執行一些操作(例如執行計算、持久化數據、調用外部 Web 服務等)
- 將消息或派生消息轉發給另一個參與者
- 實例化一個新的actor並將消息轉發給它
或者,參與者可以選擇完全忽略該消息(即,它可以選擇不作為),如果它認為這樣做是合適的。
要實現一個actor,需要擴展akka.actor.Actor trait 並實現receive 方法。 當一個消息被發送到那個actor 時,一個actor 的receive 方法被調用(由Akka)。 它的典型實現包括模式匹配,如下面的 Akka 示例所示,以識別消息類型並做出相應的反應:
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
模式匹配是一種相對優雅的消息處理技術,與基於回調的類似實現相比,它傾向於生成“更乾淨”且更易於導航的代碼。 例如,考慮一個簡單的 HTTP 請求/響應實現。
首先,讓我們在 JavaScript 中使用基於回調的範例來實現它:
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
現在讓我們將其與基於模式匹配的實現進行比較:
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
雖然基於回調的 JavaScript 代碼確實很緊湊,但它肯定更難閱讀和導航。 相比之下,基於模式匹配的代碼可以更直接地看出正在考慮哪些案例以及如何處理每個案例。
演員系統
將一個複雜的問題遞歸地拆分為更小的子問題通常是一種合理的問題解決技術。 這種方法在計算機科學中特別有用(與單一職責原則一致),因為它傾向於產生乾淨、模塊化的代碼,幾乎沒有冗餘,而且相對容易維護。
在基於角色的設計中,使用此技術有助於將角色的邏輯組織成稱為角色系統的層次結構。 參與者係統提供了參與者之間交互的基礎設施。
在 Akka 中,與 Actor 通信的唯一方法是通過ActorRef
。 ActorRef
表示對一個actor的引用,它阻止其他對象直接訪問或操作該actor的內部和狀態。 可以使用以下語法協議之一通過ActorRef
將消息發送給參與者:
-
!
(“tell”) – 發送消息並立即返回 ?
(“ask”) – 發送消息並返回代表可能回复的 Future
每個參與者都有一個郵箱,它的傳入消息被傳遞到該郵箱。 有多種郵箱實現可供選擇,默認實現是 FIFO。
一個actor包含許多實例變量以在處理多個消息時保持狀態。 Akka 確保參與者的每個實例都在其自己的輕量級線程中運行,並且一次處理一個消息。 通過這種方式,可以可靠地維護每個參與者的狀態,而無需開發人員明確擔心同步或競爭條件。
通過 Akka Actor API 為每個 Actor 提供了以下有用信息,用於執行其任務:
-
sender
: 當前正在處理的消息的發送者的ActorRef
-
context
:與actor正在運行的上下文相關的信息和方法(例如,包括用於實例化新actor的actorOf
方法) -
supervisionStrategy
策略:定義用於從錯誤中恢復的策略 self
: 演員本身的ActorRef
為了幫助將這些教程聯繫在一起,讓我們考慮一個計算文本文件中單詞數量的簡單示例。
對於我們的 Akka 示例,我們將把問題分解為兩個子任務; 即,(1)計算單行字數的“子”任務和(2)將每行字數相加以獲得文件中總字數的“父”任務。
父actor將從文件中加載每一行,然後將計算該行中單詞的任務委託給子actor。 當孩子完成後,它將向父母發送一條帶有結果的消息。 父級將接收帶有字數(每行)的消息,並為整個文件中的總字數保留一個計數器,然後在完成後將其返回給其調用者。
(請注意,下面提供的 Akka 教程代碼示例僅用於教學,因此不一定關注所有邊緣條件、性能優化等。此外,下面顯示的代碼示例的完整可編譯版本可在這個要點。)
我們先來看一個StringCounterActor
子類的示例實現:
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
這個actor有一個非常簡單的任務:消費ProcessStringMsg
消息(包含一行文本),計算指定行的單詞數,並通過StringProcessedMsg
消息將結果返回給發送者。 請注意,我們已經實現了我們的類以使用!
(“tell”) 方法發送StringProcessedMsg
消息(即發送消息並立即返回)。
好的,現在讓我們把注意力轉向父WordCounterActor
類:
1. case class StartProcessFileMsg() 2. 3. class WordCounterActor(filename: String) extends Actor { 4. 5. private var running = false 6. private var totalLines = 0 7. private var linesProcessed = 0 8. private var result = 0 9. private var fileSender: Option[ActorRef] = None 10. 11. def receive = { 12. case StartProcessFileMsg() => { 13. if (running) { 14. // println just used for example purposes; 15. // Akka logger should be used instead 16. println("Warning: duplicate start message received") 17. } else { 18. running = true 19. fileSender = Some(sender) // save reference to process invoker 20. import scala.io.Source._ 21. fromFile(filename).getLines.foreach { line => 22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) 23. totalLines += 1 24. } 25. } 26. } 27. case StringProcessedMsg(words) => { 28. result += words 29. linesProcessed += 1 30. if (linesProcessed == totalLines) { 31. fileSender.map(_ ! result) // provide result to process invoker 32. } 33. } 34. case _ => println("message not recognized!") 35. } 36. }
這裡發生了很多事情,所以讓我們更詳細地檢查它們中的每一個(請注意,以下討論中引用的行號基於上面的代碼示例) ......

首先,請注意要處理的文件的名稱被傳遞給WordCounterActor
構造函數(第 3 行)。 這表明actor僅用於處理單個文件。 這也簡化了開發人員的編碼工作,避免了在工作完成時重置狀態變量( running
、 totalLines
、 linesProcessed
和result
)的需要,因為實例只使用一次(即處理單個文件)然後丟棄。
接下來,觀察WordCounterActor
處理兩種類型的消息:
-
StartProcessFileMsg
(第 12 行)- 從最初啟動
WordCounterActor
的外部參與者接收。 - 收到後,
WordCounterActor
首先檢查它是否沒有收到冗餘請求。 - 如果請求是多餘的,
WordCounterActor
會生成一個警告並且什麼都不做(第 16 行)。 - 如果請求不是多餘的:
-
WordCounterActor
將對發送者的引用存儲在fileSender
實例變量中(請注意,這是一個Option[ActorRef]
而不是Option[Actor]
- 參見第 9 行)。 需要此ActorRef
以便稍後在處理最終StringProcessedMsg
(從StringCounterActor
子級接收,如下所述)時訪問和響應它。 - 然後
WordCounterActor
讀取文件,並且在加載文件中的每一行時,創建一個StringCounterActor
子代,並將包含要處理的行的消息傳遞給它(第 21-24 行)。
-
- 從最初啟動
-
StringProcessedMsg
(第 27 行)- 當子 StringCounterActor 完成處理分配給它的行時從子
StringCounterActor
接收。 - 收到後,
WordCounterActor
增加文件的行計數器,如果文件中的所有行都已處理(即,當totalLines
和linesProcessed
相等時),它將最終結果發送到原始fileSender
(第 28-31 行)。
- 當子 StringCounterActor 完成處理分配給它的行時從子
再次注意,在 Akka 中,actor 間通信的唯一機制是消息傳遞。 消息是參與者共享的唯一內容,由於參與者可能同時訪問相同的消息,因此它們是不可變的很重要,以避免競爭條件和意外行為。
因此,通常以案例類的形式傳遞消息,因為它們在默認情況下是不可變的,並且它們與模式匹配的無縫集成。
讓我們用運行整個應用程序的代碼示例來結束該示例。
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
注意這次如何?
方法用於發送消息。 這樣,調用者可以使用返回的 Future 在可用時打印最終結果,並通過關閉 ActorSystem 退出程序。
Akka 容錯和主管策略
在演員系統中,每個演員都是其孩子的監督者。 如果一個actor無法處理一條消息,它會掛起自己和它的所有子節點,並通常以異常的形式向它的主管發送一條消息。
在 Akka 中,主管響應和處理從其子級滲透到它的異常的方式稱為主管策略。 主管策略是定義系統容錯行為的主要且直接的機制。
當表示失敗的消息到達主管時,它可以採取以下操作之一:
- 恢復孩子(及其孩子),保持其內部狀態。 當子狀態沒有被錯誤破壞並且可以繼續正常運行時,可以應用此策略。
- 重新啟動孩子(及其孩子),清除其內部狀態。 此策略可用於與剛剛描述的相反的情況。 如果子狀態已被錯誤破壞,則必須先重置其狀態,然後才能在未來使用。
- 永久停止孩子(及其孩子)。 這種策略可以在錯誤條件被認為是不可糾正的情況下使用,但不會危及正在執行的其餘操作,這可以在沒有失敗的孩子的情況下完成。
- 停止自身併升級錯誤。 當主管不知道如何處理故障並因此將其上報給自己的主管時受僱。
此外,Actor 可以決定僅將動作應用於失敗的孩子或其兄弟姐妹。 為此有兩種預定義的策略:
-
OneForOneStrategy
:僅將指定的操作應用於失敗的孩子 AllForOneStrategy
:將指定的操作應用於其所有子項
這是一個使用OneForOneStrategy
的簡單示例:
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
如果未指定策略,則採用以下默認策略:
- 如果在初始化 Actor 時出錯或者如果 Actor 被殺死,則 Actor 將停止。
- 如果有任何其他類型的異常,則簡單地重新啟動actor。
這個默認策略的 Akka 提供的實現如下:
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akka 允許實施自定義主管策略,但正如 Akka 文檔所警告的那樣,請謹慎執行,因為不正確的實施可能會導致諸如阻塞 Actor 系統(即永久暫停 Actor)之類的問題。
位置透明度
Akka 架構支持位置透明性,使參與者能夠完全不知道他們收到的消息的來源。 消息的發送者可能與參與者駐留在同一 JVM 中,也可能駐留在單獨的 JVM 中(運行在同一節點或不同節點上)。 Akka 能夠以對參與者(以及開發人員)完全透明的方式處理這些案例中的每一個。 唯一需要注意的是,跨多個節點發送的消息必須是可序列化的。
Actor 系統被設計為在分佈式環境中運行,而不需要任何專門的代碼。 Akka 只需要存在一個配置文件( application.conf
)來指定要向其發送消息的節點。 下面是一個簡單的配置文件示例:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
一些離別技巧……
我們已經看到了 Akka 框架如何幫助實現並發和高性能。 然而,正如本教程所指出的,在設計和實現系統時要牢記以下幾點,以便充分利用 Akka 的力量:
- 盡可能為每個參與者分配盡可能少的任務(如前所述,遵循單一職責原則)
Actor 應該異步處理事件(即處理消息)並且不應該阻塞,否則會發生上下文切換,這會對性能產生不利影響。 具體來說,最好在Future中執行阻塞操作(IO等),以免阻塞actor; IE:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- 確保您的消息都是不可變的,因為將它們傳遞給彼此的參與者都將在它們自己的線程中同時運行。 可變消息可能會導致意外行為。
- 由於節點之間發送的消息必須是可序列化的,因此重要的是要記住消息越大,序列化、發送和反序列化它們所需的時間就越長,這會對性能產生負面影響。
結論
用 Scala 編寫的 Akka 簡化並促進了高並發、分佈式和容錯應用程序的開發,對開發人員隱藏了大部分複雜性。 做 Akka 完全公正需要的遠不止這個單一的教程,但希望這個介紹和它的例子足夠迷人,讓你想要閱讀更多。
Amazon、VMWare 和 CSC 只是積極使用 Akka 的領先公司的幾個例子。 訪問 Akka 官方網站以了解更多信息並探索 Akka 是否也可以成為您項目的正確答案。