並行性とフォールトトレランスが容易に:例を含むAkkaチュートリアル
公開: 2022-03-11挑戦
並行プログラムを書くのは難しいです。 スレッド、ロック、競合状態などを処理する必要があると、エラーが発生しやすくなり、コードの読み取り、テスト、および保守が困難になる可能性があります。
したがって、多くの人はマルチスレッドを完全に避けることを好みます。 代わりに、シングルスレッドプロセスのみを採用し、外部サービス(データベース、キューなど)に依存して、必要な同時または非同期操作を処理します。 このアプローチは正当な代替手段である場合もありますが、それが単に実行可能なオプションではない多くのシナリオがあります。 トレーディングやバンキングアプリケーション、リアルタイムゲームなど、多くのリアルタイムシステムには、シングルスレッドプロセスが完了するのを待つ余裕がありません(今すぐ答えが必要です!)。 他のシステムは、計算またはリソースを大量に消費するため、コードに並列化を導入せずに実行するには、非常に長い時間(場合によっては数時間または数日)かかります。
かなり一般的なシングルスレッドアプローチの1つ(たとえば、Node.jsの世界で広く使用されている)は、イベントベースの非ブロッキングパラダイムを使用することです。 これは、コンテキストスイッチ、ロック、およびブロックを回避することでパフォーマンスを向上させますが、複数のプロセッサを同時に使用する問題には対処しません(そのためには、複数の独立したプロセスを起動して調整する必要があります)。
つまり、並行アプリケーションを構築するために、スレッド、ロック、および競合状態の内部に深く入り込むしかないということですか?
Akkaフレームワークのおかげで、答えはノーです。 このチュートリアルでは、Akkaの例を紹介し、並行分散アプリケーションの実装を容易にし、簡素化する方法を探ります。
Akkaフレームワークとは何ですか?
Akkaは、JVM上で高度な並行性、分散性、およびフォールトトレラントなアプリケーションを構築するためのツールキットおよびランタイムです。 AkkaはScalaで書かれており、ScalaとJavaの両方に言語バインディングが提供されています。
並行性を処理するためのAkkaのアプローチは、アクターモデルに基づいています。 アクターベースのシステムでは、すべてがオブジェクト指向設計のオブジェクトであるのとほぼ同じように、すべてがアクターです。 ただし、主な違いは、特に私たちの議論に関連して、アクターモデルが並行モデルとして機能するように特別に設計および設計されているのに対し、オブジェクト指向モデルはそうではないことです。 より具体的には、Scalaアクターシステムでは、アクターは、シーケンシャル性を前提とせずに、相互作用して情報を共有します。 アクターが互いに情報を共有し、互いにタスクを実行するメカニズムは、メッセージパッシングです。
Akkaは、アクターと基盤となるシステムの間にレイヤーを作成し、アクターがメッセージを処理するだけで済むようにします。 スレッドの作成とスケジューリング、メッセージの受信とディスパッチ、競合状態と同期の処理のすべての複雑さは、透過的に処理するためのフレームワークに委ねられています。
Akkaは、リアクティブマニフェストを厳守します。 リアクティブアプリケーションは、従来のマルチスレッドアプリケーションを、次の1つ以上の要件を満たすアーキテクチャに置き換えることを目的としています。
- イベント駆動型。 アクターを使用すると、リクエストを非同期で処理し、非ブロッキング操作を排他的に使用するコードを記述できます。
- スケーラブル。 Akkaでは、メッセージパッシングと場所の透過性の両方のおかげで、コードを変更せずにノードを追加できます。
- 弾力性。 すべてのアプリケーションでエラーが発生し、ある時点で失敗します。 Akkaは、自己回復システムを促進するための「監視」(フォールトトレランス)戦略を提供します。
- レスポンシブ。 今日の高性能で迅速な応答アプリケーションの多くは、ユーザーに迅速なフィードバックを提供する必要があるため、非常にタイムリーにイベントに対応する必要があります。 Akkaのノンブロッキング、メッセージベースの戦略はこれを達成するのに役立ちます。
アッカの俳優とは何ですか?
アクターは本質的に、メッセージを受信し、それらを処理するためのアクションを実行するオブジェクトにすぎません。 メッセージの送信元から切り離されており、受信したメッセージのタイプを適切に認識し、それに応じてアクションを実行することが唯一の責任です。
メッセージを受信すると、アクターは次の1つ以上のアクションを実行できます。
- 一部の操作自体を実行します(計算の実行、データの永続化、外部Webサービスの呼び出しなど)
- メッセージまたは派生メッセージを別のアクターに転送する
- 新しいアクターをインスタンス化し、メッセージを転送します
あるいは、アクターは、メッセージを完全に無視することを選択する場合があります(つまり、実行しないことを選択する場合があります)。
アクターを実装するには、akka.actor.Actorトレイトを拡張し、receiveメソッドを実装する必要があります。 アクターのreceiveメソッドは、メッセージがそのアクターに送信されるときに(Akkaによって)呼び出されます。 その典型的な実装は、次のAkkaの例に示すように、メッセージタイプを識別し、それに応じて反応するパターンマッチングで構成されます。
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
パターンマッチングは、メッセージを処理するための比較的洗練された手法であり、コールバックに基づく同等の実装よりも「クリーン」でナビゲートしやすいコードを生成する傾向があります。 たとえば、単純なHTTP要求/応答の実装について考えてみます。
まず、JavaScriptでコールバックベースのパラダイムを使用してこれを実装しましょう。
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
次に、これをパターンマッチングベースの実装と比較してみましょう。
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
コールバックベースのJavaScriptコードは確かにコンパクトですが、読みやすく、ナビゲートするのは確かに困難です。 比較すると、パターンマッチングベースのコードは、どのケースが考慮され、それぞれがどのように処理されているかをより即座に明らかにします。
アクターシステム
複雑な問題を取り上げ、それをより小さなサブ問題に再帰的に分割することは、一般的に適切な問題解決手法です。 このアプローチは、冗長性がほとんどまたはまったくなく、保守が比較的容易な、クリーンでモジュール化されたコードを生成する傾向があるため、(単一責任の原則と一致する)コンピューターサイエンスで特に有益です。
アクターベースの設計では、この手法を使用すると、アクターシステムとして知られる階層構造へのアクターの論理的な編成が容易になります。 アクターシステムは、アクターが相互作用するためのインフラストラクチャを提供します。
Akkaでは、アクターと通信する唯一の方法はActorRef
を使用することです。 ActorRef
は、他のオブジェクトがそのアクターの内部および状態に直接アクセスまたは操作することを妨げるアクターへの参照を表します。 メッセージは、次の構文プロトコルのいずれかを使用して、 ActorRef
を介してアクターに送信できます。
-
!
(「伝える」)–メッセージを送信し、すぐに戻ります ?
(「尋ねる」)–メッセージを送信し、可能な応答を表すFutureを返します
各アクターには、着信メッセージの配信先となるメールボックスがあります。 選択できるメールボックスの実装は複数あり、デフォルトの実装はFIFOです。
アクターには、複数のメッセージを処理している間、状態を維持するための多くのインスタンス変数が含まれています。 Akkaは、アクターの各インスタンスが独自の軽量スレッドで実行され、メッセージが一度に1つずつ処理されるようにします。 このようにして、開発者が同期や競合状態について明示的に心配する必要なしに、各アクターの状態を確実に維持できます。
各アクターには、AkkaActorAPIを介してタスクを実行するための次の有用な情報が提供されます。
-
sender
:現在処理中のメッセージの送信者へのActorRef
-
context
:アクターが実行されているコンテキストに関連する情報とメソッド(たとえば、新しいアクターをインスタンス化するためのactorOf
メソッドを含む) -
supervisionStrategy
:エラーからの回復に使用される戦略を定義します self
:アクター自体のActorRef
これらのチュートリアルを結び付けるために、テキストファイル内の単語数を数える簡単な例を考えてみましょう。
Akkaの例では、問題を2つのサブタスクに分解します。 つまり、(1)1行の単語数をカウントする「子」タスク、および(2)ファイル内の単語の総数を取得するためにそれらの行ごとの単語数を合計する「親」タスクです。
親アクターはファイルから各行をロードし、次にその行の単語をカウントするタスクを子アクターに委任します。 子が完了すると、結果とともにメッセージが親に返送されます。 親は(各行の)単語数を含むメッセージを受信し、ファイル全体の単語の総数のカウンターを保持し、完了時に呼び出し元に戻ります。
(以下に示すAkkaチュートリアルコードサンプルは、教訓的なもののみを目的としているため、必ずしもすべてのエッジ条件やパフォーマンスの最適化などに関係するわけではないことに注意してください。また、以下に示すコードサンプルの完全なコンパイル可能なバージョンは、この要点。)
まず、子StringCounterActor
クラスのサンプル実装を見てみましょう。
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
このアクターのタスクは非常に単純ですProcessStringMsg
メッセージ(テキスト行を含む)を消費し、指定された行の単語数をカウントし、 StringProcessedMsg
メッセージを介して結果を送信者に返します。 !
を使用するようにクラスを実装したことに注意してください。 (「tell」) StringProcessedMsg
メッセージを送信する(つまり、メッセージを送信してすぐに戻る)メソッド。
では、親のWordCounterActor
クラスに注目しましょう。
1. case class StartProcessFileMsg() 2. 3. class WordCounterActor(filename: String) extends Actor { 4. 5. private var running = false 6. private var totalLines = 0 7. private var linesProcessed = 0 8. private var result = 0 9. private var fileSender: Option[ActorRef] = None 10. 11. def receive = { 12. case StartProcessFileMsg() => { 13. if (running) { 14. // println just used for example purposes; 15. // Akka logger should be used instead 16. println("Warning: duplicate start message received") 17. } else { 18. running = true 19. fileSender = Some(sender) // save reference to process invoker 20. import scala.io.Source._ 21. fromFile(filename).getLines.foreach { line => 22. context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) 23. totalLines += 1 24. } 25. } 26. } 27. case StringProcessedMsg(words) => { 28. result += words 29. linesProcessed += 1 30. if (linesProcessed == totalLines) { 31. fileSender.map(_ ! result) // provide result to process invoker 32. } 33. } 34. case _ => println("message not recognized!") 35. } 36. }
ここでは多くのことが行われているので、それぞれをさらに詳しく調べてみましょう(以下の説明で参照されている行番号は、上記のコードサンプルに基づいていることに注意してください) …

まず、処理するファイルの名前がWordCounterActor
コンストラクターに渡されることに注意してください(3行目)。 これは、アクターが単一のファイルを処理するためにのみ使用されることを示しています。 これにより、インスタンスは1回だけ使用されるため(つまり、単一のファイルを処理するため)、ジョブの完了時に状態変数( running
、 totalLines
、 linesProcessed
、およびresult
)をリセットする必要がなくなるため、開発者のコーディングジョブも簡素化されます。その後、破棄されます。
次に、 WordCounterActor
が2種類のメッセージを処理することを確認します。
-
StartProcessFileMsg
(12行目)-
WordCounterActor
を最初に開始する外部アクターから受信します。 - 受信すると、
WordCounterActor
は最初に冗長な要求を受信していないことを確認します。 - 要求が冗長である場合、
WordCounterActor
は警告を生成し、それ以上何も行われません(16行目)。 - リクエストが冗長でない場合:
-
WordCounterActor
は、送信者への参照をfileSender
インスタンス変数に格納します(これはOption[Option[Actor]
]ではなくOption[ActorRef]
であることに注意してください。9行目を参照してください)。 このActorRef
は、後で最後のStringProcessedMsg
(以下で説明するように、StringCounterActor
の子から受信される)を処理するときにアクセスして応答するために必要です。 - 次に、
WordCounterActor
がファイルを読み取り、ファイルの各行がロードされると、StringCounterActor
の子が作成され、処理される行を含むメッセージがそれに渡されます(21〜24行目)。
-
-
-
StringProcessedMsg
(27行目)- 割り当てられた行の処理が完了すると、子
StringCounterActor
から受信されます。 -
WordCounterActor
は、受信するとファイルの行カウンターをインクリメントし、ファイル内のすべての行が処理された場合(つまり、totalLines
とlinesProcessed
が等しい場合)、最終結果を元のfileSender
に送信します(28〜31行目)。
- 割り当てられた行の処理が完了すると、子
繰り返しになりますが、Akkaでは、アクター間の通信の唯一のメカニズムはメッセージパッシングであることに注意してください。 アクターが共有するのはメッセージだけです。アクターは同じメッセージに同時にアクセスできる可能性があるため、競合状態や予期しない動作を回避するために、メッセージが不変であることが重要です。
したがって、メッセージはデフォルトで不変であり、パターンマッチングとシームレスに統合されるため、ケースクラスの形式でメッセージを渡すのが一般的です。
アプリ全体を実行するためのコードサンプルで例を締めくくりましょう。
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
今回はどのように?
メソッドはメッセージの送信に使用されます。 このようにして、呼び出し元は返されたFutureを使用して、これが使用可能な場合に最終結果を出力し、ActorSystemをシャットダウンしてプログラムを終了できます。
Akkaのフォールトトレランスとスーパーバイザー戦略
アクターシステムでは、各アクターはその子のスーパーバイザーです。 アクターがメッセージの処理に失敗した場合、アクターは自身とそのすべての子を一時停止し、通常は例外の形式でスーパーバイザーにメッセージを送信します。
Akkaでは、スーパーバイザーがその子からそれに浸透する例外に反応して処理する方法は、スーパーバイザー戦略と呼ばれます。 スーパーバイザー戦略は、システムのフォールトトレラントな動作を定義するための主要で直接的なメカニズムです。
障害を示すメッセージがスーパーバイザーに到達すると、次のいずれかのアクションを実行できます。
- 内部状態を維持しながら、子(およびその子)を再開します。 この戦略は、子の状態がエラーによって破損しておらず、正しく機能し続けることができる場合に適用できます。
- 子(およびその子)を再起動して、内部状態をクリアします。 この戦略は、今説明したものとは逆のシナリオで使用できます。 子の状態がエラーによって破損している場合は、将来使用する前に状態をリセットする必要があります。
- 子供(およびその子供)を永久に停止します。 この戦略は、エラー状態が修正可能であるとは思われないが、実行中の残りの操作を危険にさらさない場合に使用できます。これは、失敗した子がいない場合に完了することができます。
- 自分自身を停止し、エラーをエスカレーションします。 スーパーバイザーが障害の処理方法を知らないため、自身のスーパーバイザーにエスカレーションする場合に使用されます。
さらに、アクターは、失敗した子またはその兄弟だけにアクションを適用することを決定できます。 これには、2つの事前定義された戦略があります。
-
OneForOneStrategy
:指定されたアクションを失敗した子にのみ適用します AllForOneStrategy
:指定されたアクションをそのすべての子に適用します
OneForOneStrategy
を使用した簡単な例を次に示します。
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
戦略が指定されていない場合、次のデフォルト戦略が採用されます。
- アクターの初期化中にエラーが発生した場合、またはアクターが強制終了された場合、アクターは停止されます。
- 他の種類の例外があった場合、アクターは単に再起動されます。
このデフォルト戦略のAkka提供の実装は次のとおりです。
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akkaはカスタムスーパーバイザー戦略の実装を許可しますが、Akkaのドキュメントで警告されているように、実装が正しくないとアクターシステムのブロック(つまり、アクターの永続的な停止)などの問題が発生する可能性があるため、注意して行ってください。
場所の透明性
Akkaアーキテクチャは場所の透過性をサポートしているため、アクターは受信したメッセージの発信元を完全に認識できません。 メッセージの送信者は、アクターと同じJVMに存在する場合と、別のJVM(同じノードまたは別のノードで実行されている)に存在する場合があります。 Akkaを使用すると、これらの各ケースを、アクター(したがって開発者)に対して完全に透過的な方法で処理できます。 唯一の注意点は、複数のノードを介して送信されるメッセージはシリアル化可能でなければならないということです。
アクターシステムは、特別なコードを必要とせずに分散環境で実行するように設計されています。 Akkaは、メッセージの送信先ノードを指定する構成ファイル( application.conf
)の存在のみを必要とします。 構成ファイルの簡単な例を次に示します。
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
別れのヒント…
Akkaフレームワークが並行性と高性能の実現にどのように役立つかを見てきました。 ただし、このチュートリアルで指摘したように、Akkaの能力を最大限に活用するために、システムを設計および実装する際に留意すべき点がいくつかあります。
- 可能な限り、各アクターには可能な限り最小のタスクを割り当てる必要があります(前述のように、単一責任の原則に従って)
アクターはイベント(つまり、メッセージの処理)を非同期的に処理する必要があり、ブロックしないでください。そうしないと、パフォーマンスに悪影響を与える可能性のあるコンテキストスイッチが発生します。 具体的には、アクターをブロックしないように、将来的にブロック操作(IOなど)を実行するのが最善です。 すなわち:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- メッセージを相互に渡すアクターはすべて、独自のスレッドで同時に実行されるため、メッセージがすべて不変であることを確認してください。 変更可能なメッセージは、予期しない動作を引き起こす可能性があります。
- ノード間で送信されるメッセージはシリアル化可能である必要があるため、メッセージが大きいほど、メッセージのシリアル化、送信、および逆シリアル化に時間がかかり、パフォーマンスに悪影響を与える可能性があることに注意してください。
結論
Scalaで書かれたAkkaは、高度に並行し、分散され、フォールトトレラントなアプリケーションの開発を簡素化および促進し、開発者から複雑さの多くを隠します。 Akkaの完全な正義を行うには、この単一のチュートリアルよりもはるかに多くのことが必要になりますが、この紹介とその例が、もっと読みたくなるほど十分に魅力的であったことを願っています。
Amazon、VMWare、およびCSCは、Akkaを積極的に使用している大手企業のほんの一例です。 Akkaの公式ウェブサイトにアクセスして、詳細を確認し、Akkaがプロジェクトの正しい答えになるかどうかを確認してください。