ソフトウェア統合の合理化:ApacheCamelチュートリアル
公開: 2022-03-11ソフトウェアが情報の空白の中に存在することはほとんどありません。 少なくとも、それは私たちのソフトウェアエンジニアが私たちが開発するほとんどのアプリケーションに対して行うことができる仮定です。
あらゆる規模で、ソフトウェアのすべての部分は、何らかの方法で他のソフトウェアと通信します。さまざまな理由で、どこかから参照データを取得する、監視信号を送信する、分散の一部でありながら他のサービスと連絡を取るなどです。システムなど。
このチュートリアルでは、大規模なソフトウェアを統合する際の最大の課題のいくつかと、ApacheCamelがそれらを簡単に解決する方法を学習します。
問題:システム統合のためのアーキテクチャ設計
ソフトウェアエンジニアリングの生活の中で、少なくとも1回は次のことを行ったことがあるかもしれません。
- データ送信を開始する必要があるビジネスロジックのフラグメントを特定します。
- 同じアプリケーション層で、受信者が期待していることに応じてデータ変換を記述します。
- ネットワークを介した転送とルーティングに適した構造でデータをラップします。
- 適切なドライバーまたはクライアントSDKを使用して、ターゲットアプリケーションへの接続を開きます。
- データを送信し、応答を処理します。
なぜこれが悪い行動なのですか?
この種の接続はごくわずかですが、管理しやすいままです。 システム間の関係が増えるにつれ、アプリケーションのビジネスロジックは統合ロジックと混ざり合います。統合ロジックは、データの適応、2つのシステム間の技術的な違いの補正、SOAP、REST、またはよりエキゾチックなリクエストによる外部システムへのデータの転送に関するものです。 。
複数のアプリケーションを統合する場合、そのようなコードの依存関係の全体像をたどるのは非常に困難です。データはどこで生成され、どのサービスがそれを消費するのでしょうか。 統合ロジックが複製されて起動する場所がたくさんあります。
このようなアプローチでは、タスクは技術的には達成されますが、統合の保守性とスケーラビリティに関する大きな問題が発生します。 このシステムでのデータフローの迅速な再編成は、監視の欠如、回路の破損、面倒なデータ回復などのより深い問題は言うまでもなく、ほぼ不可能です。
これはすべて、かなり大規模な企業の範囲でソフトウェアを統合する場合に特に重要です。 エンタープライズ統合に対処するということは、さまざまなプラットフォームで動作し、さまざまな場所に存在する一連のアプリケーションを操作することを意味します。 このようなソフトウェアランドスケープでのデータ交換は非常に困難です。 業界の高セキュリティ基準を満たし、データを転送するための信頼できる方法を提供する必要があります。 エンタープライズ環境では、システム統合には、個別の徹底的に精巧なアーキテクチャ設計が必要です。
この記事では、ソフトウェア統合で直面する固有の問題を紹介し、統合タスクの経験に基づくソリューションをいくつか提供します。 統合開発者の頭痛の種の最悪の部分を軽減できる便利なフレームワークであるApacheCamelについて理解します。 次に、CamelがKubernetesを利用したマイクロサービスのクラスターで通信を確立するのにどのように役立つかを示す例を示します。
統合の難しさ
この問題を解決するために広く使用されているアプローチは、アプリケーションの統合レイヤーを分離することです。 同じアプリケーション内に存在することも、独立して実行される専用のソフトウェアとして存在することもできます。後者の場合、ミドルウェアと呼ばれます。
ミドルウェアを開発およびサポートするときに通常直面する問題は何ですか? 一般的に、次の重要な項目があります。
- すべてのデータチャネルはある程度信頼できません。 この信頼性の欠如に起因する問題は、データ強度が低から中程度の場合は発生しない可能性があります。 アプリケーションメモリからその下の下位キャッシュおよび機器までの各ストレージレベルは、潜在的な障害の影響を受けます。 まれなエラーは、大量のデータでのみ発生します。 成熟した本番環境に対応したベンダー製品でさえ、データ損失に関連する未解決のバグトラッカーの問題があります。 ミドルウェアシステムは、これらのデータの犠牲者を通知し、メッセージの再配信をタイムリーに提供できる必要があります。
- アプリケーションは、さまざまなプロトコルとデータ形式を使用します。 これは、統合システムがデータ変換と他の参加者へのアダプターのカーテンであり、さまざまなテクノロジーを利用していることを意味します。 これらには、プレーンなREST API呼び出しが含まれる場合がありますが、キューブローカーへのアクセス、FTP経由でのCSV注文の送信、またはデータベーステーブルへのデータのバッチプルも含まれる可能性があります。 これは長いリストであり、短くなることはありません。
- データ形式とルーティングルールの変更は避けられません。 データ構造を変更するアプリケーション開発プロセスの各ステップは、通常、統合データ形式と変換の変更につながります。 場合によっては、再編成されたエンタープライズデータフローによるインフラストラクチャの変更が必要になります。 たとえば、これらの変更は、会社全体のすべてのマスターデータエントリを処理する必要がある参照データを検証する単一のポイントを導入するときに発生する可能性があります。
N
のシステムでは、システム間に最大でほぼN^2
の接続が存在する可能性があるため、変更を適用する必要のある場所の数は非常に急速に増加します。 雪崩のようになります。 保守性を維持するために、ミドルウェアレイヤーは、多様なルーティングとデータ変換を使用して依存関係を明確に把握する必要があります。
統合を設計し、最適なミドルウェアソリューションを選択するときは、これらのアイデアを念頭に置く必要があります。 これを処理するための可能な方法の1つは、エンタープライズサービスバス(ESB)を活用することです。 しかし、主要ベンダーが提供するESBは一般に重すぎて、価値があるよりも厄介なことがよくあります。ESBをすぐに使い始めることはほとんど不可能であり、学習曲線が非常に急であり、その柔軟性が長いリストに犠牲になっています。機能と組み込みツールの。 私の意見では、軽量のオープンソース統合ソリューションははるかに優れています。より弾力性があり、クラウドへのデプロイが簡単で、拡張も簡単です。
ソフトウェア統合は簡単ではありません。 今日、私たちはマイクロサービスアーキテクチャを構築し、小さなサービスの群れに対処するため、それらがどれほど効率的に通信する必要があるかについても大きな期待を寄せています。
エンタープライズ統合パターン
予想されるように、一般的なソフトウェア開発と同様に、データのルーティングと変換の開発には繰り返しの操作が含まれます。 この分野での経験は、かなり長い間統合の問題を扱う専門家によって要約され、体系化されてきました。 結果として、データフローの設計に使用されるエンタープライズ統合パターンと呼ばれる抽出されたテンプレートのセットがあります。 これらの統合方法は、GregorHopheとBobbyWolfeによる同名の本で説明されています。これは、重要なGang of Fourの本によく似ていますが、ソフトウェアの接着の分野です。
例を挙げると、ノーマライザーパターンは、異なるデータ形式を持つ意味的に等しいメッセージを単一の正規モデルにマップするコンポーネントを導入します。または、アグリゲーターは、メッセージのシーケンスを1つに結合するEIPです。
これらは、アーキテクチャの問題を解決するために使用される確立されたテクノロジーにとらわれない抽象化であるため、EIPは、コードレベルを掘り下げることなく、データフローを十分に詳細に説明するアーキテクチャ設計の作成に役立ちます。 統合ルートを記述するためのこのような表記は、設計を簡潔にするだけでなく、さまざまなビジネス分野のチームメンバーとの統合タスクを解決するために非常に重要な共通の命名法と共通の言語を設定します。
ApacheCamelの紹介
数年前、私は広く分散した場所に店舗を持つ巨大な食料品小売ネットワークで企業統合を構築していました。 私は独自のESBソリューションから始めましたが、それを維持するのは非常に面倒であることがわかりました。 次に、私たちのチームはApache Camelに出くわし、「概念実証」作業を行った後、Camelルートのすべてのデータフローをすばやく書き直しました。
Apache Camelは、私がよく知っているEIPのリストを実装するメッセージ指向ミドルウェアフレームワークである「メディエーションルーター」として説明できます。 これらのパターンを利用し、すべての一般的なトランスポートプロトコルをサポートし、便利なアダプタの膨大なセットが含まれています。 Camelを使用すると、独自のコードを記述しなくても、多数の統合ルーチンを処理できます。
これとは別に、私は次のApacheCamel機能を選び出します。
- 統合ルートは、ブロックで構成されたパイプラインとして記述されます。 データフローを追跡するのに役立つ完全に透明な画像を作成します。
- Camelには、多くの一般的なAPI用のアダプターがあります。 たとえば、Apache Kafkaからのデータの取得、AWS EC2インスタンスのモニタリング、Salesforceとの統合など、これらのタスクはすべて、すぐに使用できるコンポーネントを使用して解決できます。
Apache Camelルートは、JavaまたはScalaDSLで記述できます。 (XML構成も利用できますが、冗長になりすぎてデバッグ機能が低下します。)通信サービスの技術スタックに制限はありませんが、JavaまたはScalaで作成する場合は、代わりにCamelをアプリケーションに埋め込むことができます。スタンドアロンで実行します。
Camelで使用されるルーティング表記は、次の簡単な擬似コードで記述できます。
from(Source) .transform(Transformer) .to(Destination)
Source
、 Transformer
、およびDestination
は、URIによって実装コンポーネントを参照するエンドポイントです。
Camelが以前に説明した統合の問題を解決できるのは何ですか? みてみましょう。 まず、ルーティングと変換のロジックは、専用のApacheCamel構成でのみ機能するようになりました。 次に、EIPの使用と組み合わせた簡潔で自然なDSLを通じて、システム間の依存関係の図が表示されます。 わかりやすい抽象化で構成されており、ルーティングロジックは簡単に調整できます。 そして最後に、適切なアダプターがすでに含まれている可能性が高いため、変換コードのヒープを記述する必要はありません。
追加する必要があります。ApacheCamelは成熟したフレームワークであり、定期的に更新されます。 素晴らしいコミュニティとかなりの蓄積された知識ベースがあります。
それには独自の欠点があります。 Camelを複雑な統合スイートと見なすべきではありません。 これは、ビジネスプロセス管理ツールやアクティビティモニターなどの高レベルの機能を備えていないツールボックスですが、そのようなソフトウェアを作成するために使用できます。
代替システムには、たとえば、SpringIntegrationやMuleESBなどがあります。 Spring Integrationの場合、軽量であると考えられていますが、私の経験では、Spring Integrationをまとめて、大量のXML構成ファイルを作成すると、予想外に複雑になり、簡単な方法ではありません。 Mule ESBは堅牢で非常に機能的なツールセットですが、その名前が示すように、エンタープライズサービスバスであるため、別のウェイトカテゴリに属しています。 Muleは、豊富な機能を備えたApacheCamelをベースにした同様の製品であるFuseESBと比較できます。 私にとって、サービスの接着にApache Camelを使用することは、今日では簡単です。 使いやすく、何がどこに行くのかを明確に説明すると同時に、複雑な統合を構築するのに十分な機能を備えています。
サンプルルートの作成
コードを書き始めましょう。 まず、単一の送信元から受信者のリストにメッセージをルーティングする同期データフローから始めます。 ルーティングルールはJavaDSLで記述されます。
Mavenを使用してプロジェクトをビルドします。 まず、次の依存関係をpom.xml
に追加します。
<dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>
または、 camel-archetype-java
アーキタイプの上にアプリケーションを構築することもできます。
キャメルルート定義は、 RouteBuilder.configure
メソッドで宣言されます。
public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }
この定義では、JSONファイルからレコードをフェッチし、それらをアイテムに分割し、メッセージの内容に基づいて一連のハンドラーにルーティングするルートを作成します。
準備したテストデータで実行してみましょう。 出力を取得します。
INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert
予想通り、Camelはメッセージを宛先にルーティングしました。
データ転送の選択肢
上記の例では、コンポーネント間の相互作用は同期的であり、アプリケーションメモリを介して実行されます。 ただし、メモリを共有しない個別のアプリケーションを処理する場合、通信する方法は他にもたくさんあります。
- ファイル交換。 1つのアプリケーションは、別のアプリケーションが使用する共有データのファイルを生成します。 昔ながらの精神が宿っているところです。 この通信方法には、トランザクションと一貫性の欠如、パフォーマンスの低下、システム間の孤立した調整など、多くの結果があります。 多くの開発者は、プロセスを多かれ少なかれ管理しやすくするために、自家製の統合ソリューションを作成することになりました。
- 共通データベース。 共有したいデータを単一のデータベースの共通スキーマにアプリケーションに保存させます。 統合スキーマの設計とテーブルへの同時アクセスの処理は、このアプローチの最も顕著な課題です。 ファイル交換と同様に、これが永続的なボトルネックになるのは簡単です。
- リモートAPI呼び出し。 通常のメソッド呼び出しのように、アプリケーションが実行中の別のアプリケーションと対話できるようにするためのインターフェースを提供します。 アプリケーションはAPI呼び出しを介して機能を共有しますが、プロセスでそれらを緊密に結合します。
- メッセージング。 各アプリケーションを共通のメッセージングシステムに接続し、データを交換し、メッセージを使用して非同期で動作を呼び出します。 メッセージを配信するために、送信者と受信者のどちらも同時に稼働している必要はありません。
対話する方法は他にもありますが、大まかに言えば、対話には同期と非同期の2つのタイプがあることに注意してください。 1つ目は、コードで関数を呼び出すようなものです。実行フローは、実行されて値が返されるまで待機します。 非同期アプローチでは、同じデータが中間メッセージキューまたはサブスクリプショントピックを介して送信されます。 非同期リモート関数呼び出しは、要求/応答EIPとして実装できます。
ただし、非同期メッセージングは万能薬ではありません。 特定の制限があります。 Web上でメッセージングAPIを目にすることはめったにありません。 同期RESTサービスははるかに人気があります。 しかし、メッセージングミドルウェアは、エンタープライズイントラネットまたは分散システムのバックエンドインフラストラクチャで広く使用されています。
メッセージキューの使用
この例を非同期にしましょう。 キューとサブスクリプショントピックを管理するソフトウェアシステムは、メッセージブローカーと呼ばれます。 これは、テーブルと列のRDBMSのようなものです。 キューはポイントツーポイント統合として機能し、トピックは多くの受信者とのパブリッシュ/サブスクライブ通信用です。 Apache ActiveMQは堅牢で埋め込み可能であるため、JMSメッセージブローカーとして使用します。
次の依存関係を追加します。 すべてのActiveMQjarを含むactivemq-all
をプロジェクトに追加しすぎる場合がありますが、アプリケーションの依存関係を複雑にしないようにします。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>
次に、プログラムでブローカーを開始します。 Spring Bootでは、 spring-boot-starter-activemq
Maven依存関係をプラグインすることにより、このための自動構成を取得します。
次のコマンドを使用して、コネクタのエンドポイントのみを指定して、新しいメッセージブローカを実行します。
BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();
そして、次の構成スニペットをconfigure
メソッド本体に追加します。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));
これで、メッセージキューを使用して前の例を更新できます。 キューはメッセージ配信時に自動的に作成されます。
public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }
さて、これで相互作用は非同期になりました。 このデータの潜在的な消費者は、準備ができたときにデータにアクセスできます。 これは、リアクティブアーキテクチャで実現しようとしている緩い結合の例です。 1つのサービスが利用できなくても、他のサービスはブロックされません。 さらに、コンシューマーは、キューから並行してスケーリングおよび読み取りを行う場合があります。 キュー自体がスケーリングされ、パーティション化される場合があります。 永続キューは、すべての参加者がダウンした場合でも、処理されるのを待って、データをディスクに保存できます。 その結果、このシステムはよりフォールトトレラントになります。
驚くべき事実は、CERNがApache CamelとActiveMQを使用して、大型ハドロン衝突型加速器(LHC)のシステムを監視していることです。 このタスクに適切なミドルウェアソリューションの選択を説明する興味深い修士論文もあります。 したがって、基調講演で述べられているように、「JMSはありません。素粒子物理学はありません!」
モニタリング
前の例では、2つのサービス間にデータチャネルを作成しました。 これは、アーキテクチャの追加の潜在的な障害点であるため、注意が必要です。 ApacheCamelが提供する監視機能を見てみましょう。 基本的に、JMXからアクセス可能なMBeanを介したルートに関する統計情報を公開します。 ActiveMQは、同じ方法でキュー統計を公開します。
アプリケーションでJMXサーバーをオンにして、コマンドラインオプションで実行できるようにします。
-Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel
次に、アプリケーションを実行して、ルートがその役割を果たします。 標準のjconsole
ツールを開き、アプリケーションプロセスに接続します。 URL service:jmx:rmi:///jndi/rmi://localhost:1099/camel
に接続します。 MBeansツリーのorg.apache.camelドメインに移動します。
ルーティングに関するすべてが制御されていることがわかります。 キューには、処理中のメッセージの数、エラー数、およびメッセージ数があります。 この情報は、GraphanaやKibanaなどの豊富な機能を備えた一部の監視ツールセットにパイプラインで送ることができます。 これを行うには、よく知られているELKスタックを実装します。

Camel、ActiveMQ、およびhawt.ioと呼ばれるその他の多くを管理するためのUIを提供するプラグイン可能で拡張可能なWebコンソールもあります。
ルートのテスト
Apache Camelには、モックコンポーネントを使用してテストルートを作成するための非常に幅広い機能があります。 これは強力なツールですが、テストのためだけに個別のルートを作成するのは時間のかかるプロセスです。 パイプラインを変更せずに本番ルートでテストを実行する方が効率的です。 Camelにはこの機能があり、AdviceWithコンポーネントを使用して実装できます。
この例でテストロジックを有効にして、サンプルテストを実行してみましょう。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>
テストクラスは次のとおりです。
public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }
次に、 mvn test
を使用してアプリケーションのテストを実行します。 テストのアドバイスにより、ルートが正常に実行されたことがわかります。 実際のキューを通過するメッセージはなく、テストに合格しています。
INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied
KubernetesクラスターでのApacheCamelの使用
今日の統合の問題の1つは、アプリケーションが静的ではなくなったことです。 クラウドインフラストラクチャでは、複数のノードで同時に実行される仮想サービスを処理します。 これにより、マイクロサービスアーキテクチャが可能になり、小規模で軽量なサービスが相互作用します。 これらのサービスの有効期間は信頼できないため、動的に検出する必要があります。
クラウドサービスを結合することは、ApacheCamelで解決できるタスクです。 これは、EIPフレーバーと、Camelに多数のアダプターがあり、幅広いプロトコルをサポートしているという事実のために特に興味深いものです。 最近のバージョン2.18では、ServiceCallコンポーネントが追加されています。このコンポーネントは、APIを呼び出し、クラスター検出メカニズムを介してそのアドレスを解決する機能を導入しています。 現在、Consul、Kubernetes、Ribbonなどをサポートしています。ServiceCallがConsulで構成されているコードの例は、簡単に見つけることができます。 ここではKubernetesを使用します。これは、私のお気に入りのクラスタリングソリューションだからです。
統合スキーマは次のようになります。
Order
サービスとInventory
サービスは、静的データを返す簡単なSpringBootアプリケーションのカップルになります。 ここでは、特定の技術スタックに縛られていません。 これらのサービスは、処理したいデータを生成しています。
注文サービスコントローラー:
@RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }
次の形式でデータを生成します。
[{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]
Inventory
サービスコントローラーは、 Order
サービスのコントローラーと完全に似ています。
@RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }
InventoryStorage
は、データを保持する汎用リポジトリです。 この例では、静的な事前定義されたオブジェクトを返します。これらのオブジェクトは、次の形式にマーシャリングされます。
[{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]
それらを接続するゲートウェイルートを作成しましょう。ただし、このステップではServiceCallを使用しません。
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);
ここで、各サービスが特定のインスタンスではなく、1つのインスタンスとして動作するインスタンスのクラウドであると想像してください。 Minikubeを使用して、Kubernetesクラスターをローカルで試します。
Kubernetesノードをローカルで表示するようにネットワークルートを構成します(指定された例はMac / Linux環境用です)。
# remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5
次のようなDockerfile構成を使用して、Dockerコンテナー内のサービスをラップします。
FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar
サービスイメージをビルドしてDockerレジストリにプッシュします。 次に、ローカルのKubernetesクラスターでノードを実行します。
Kubernetes.yamlデプロイメント構成:
apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081
これらのデプロイメントをクラスター内のサービスとして公開します。
kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort
これで、クラスターからランダムに選択されたノードによってリクエストが処理されるかどうかを確認できます。 curl -X http://192.168.99.100:30517/info
を順番に数回実行して、公開されたサービスのminikube NodePortにアクセスします(ホストとポートを使用)。 出力では、リクエストバランシングを達成したことがわかります。
Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4
camel-kubernetes
とcamel-netty4-http
の依存関係をプロジェクトのpom.xml
に追加します。 次に、ルート定義間ですべてのサービスコールに共有されるKubernetesマスターノード検出を使用するようにServiceCallコンポーネントを設定します。
KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);
ServiceCall EIPは、SpringBootを十分に補完します。 ほとんどのオプションは、 application.properties
ファイルで直接構成できます。
ServiceCallコンポーネントを使用してCamelルートを強化します。
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);
また、ルートでサーキットブレーカーをアクティブにしました。 これは、配信エラーや受信者が利用できない場合にリモートシステムコールを一時停止できる統合フックです。 これは、カスケードシステム障害を回避するように設計されています。 Hystrixコンポーネントは、サーキットブレーカーパターンを実装することでこれを実現するのに役立ちます。
それを実行してテストリクエストを送信しましょう。 両方のサービスから集計された応答を取得します。
[{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]
結果は期待どおりです。
その他のユースケース
ApacheCamelがマイクロサービスをクラスターに統合する方法を示しました。 このフレームワークの他の用途は何ですか? 一般に、ルールベースのルーティングが解決策となる可能性があるすべての場所で役立ちます。 For instance, Apache Camel can be a middleware for the Internet of Things with the Eclipse Kura adapter. It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.
結論
You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.
To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:
- Is there a separate integration layer?
- Are there tests for integration?
- Do we know the expected peak data intensity?
- Do we know the expected data delivery time?
- Does message correlation matter? What if a sequence breaks?
- Should we do it in a synchronous or asynchronous way?
- Where do formats and routing rules change more frequently?
- Do we have ways to monitor the process?
In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.
If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.