マイクロサービス通信:Redisを使用したSpring統合チュートリアル
公開: 2022-03-11マイクロサービスアーキテクチャは、高度にスケーラブルなWebアプリケーションを設計および実装する上で非常に人気のあるアプローチです。 コンポーネント間のモノリシックアプリケーション内の通信は、通常、同じプロセス内のメソッドまたは関数呼び出しに基づいています。 一方、マイクロサービスベースのアプリケーションは、複数のマシンで実行される分散システムです。
これらのマイクロサービス間の通信は、安定したスケーラブルなシステムを実現するために重要です。 これを行うには複数の方法があります。 メッセージベースの通信は、これを確実に行うための1つの方法です。
メッセージングを使用する場合、コンポーネントは非同期でメッセージを交換することによって相互に作用します。 メッセージはチャネルを介して交換されます。
サービスAがサービスBと直接通信するのではなく、サービスBと通信する場合、Aはサービスを特定のチャネルに送信します。 サービスBがメッセージを読みたい場合、特定のメッセージチャネルからメッセージを取得します。
このSpring統合チュートリアルでは、Redisを使用してSpringアプリケーションにメッセージングを実装する方法を学習します。 1つのサービスがキュー内のイベントをプッシュし、別のサービスがこれらのイベントを1つずつ処理しているサンプルアプリケーションについて説明します。
春の統合
プロジェクトSpringIntegrationは、Springフレームワークを拡張して、Springベースのアプリケーション間またはアプリケーション内でのメッセージングのサポートを提供します。 コンポーネントは、メッセージングパラダイムを介して相互に接続されます。 個々のコンポーネントは、アプリケーション内の他のコンポーネントを認識していない場合があります。
Spring Integrationは、外部システムと通信するための幅広いメカニズムを提供します。 チャネルアダプタは、一方向の統合(送信または受信)に使用されるそのようなメカニズムの1つです。 また、ゲートウェイは要求/応答シナリオ(インバウンドまたはアウトバウンド)に使用されます。
Apache Camelは、広く使用されている代替手段です。 Spring統合は、Springエコシステムの一部であるため、通常、既存のSpringベースのサービスで推奨されます。
Redis
Redisは、非常に高速なインメモリデータストアです。 オプションでディスクに永続化することもできます。 単純なキーと値のペア、セット、キューなどのさまざまなデータ構造をサポートします。
Redisをキューとして使用すると、コンポーネント間でのデータの共有と水平方向のスケーリングがはるかに簡単になります。 プロデューサーまたは複数のプロデューサーはデータをキューにプッシュでき、コンシューマーまたは複数のコンシューマーはデータをプルしてイベントを処理できます。
複数のコンシューマーが同じイベントを消費することはできません。これにより、1つのイベントが1回処理されることが保証されます。
Redisをメッセージキューとして使用する利点:
- 非ブロッキング方式での個別タスクの並列実行
- 素晴らしいパフォーマンス
- 安定
- 簡単な監視とデバッグ
- 簡単な実装と使用法
ルール:
- キューへのタスクの追加は、タスク自体の処理よりも高速である必要があります。
- タスクの消費は、タスクの生成よりも高速である必要があります(そうでない場合は、コンシューマーを追加します)。
SpringとRedisの統合
以下では、Spring IntegrationwithRedisの使用方法を説明するサンプルアプリケーションの作成について説明します。
ユーザーが投稿を公開できるようにするアプリケーションがあるとします。 そして、フォロー機能を構築したいとします。 もう1つの要件は、誰かが投稿を公開するたびに、すべてのフォロワーに何らかのコミュニケーションチャネル(電子メールやプッシュ通知など)を介して通知する必要があることです。
これを実装する1つの方法は、ユーザーが何かを公開したら、各フォロワーに電子メールを送信することです。 しかし、ユーザーに1,000人のフォロワーがいるとどうなりますか? そして、1,000人のユーザーが10秒で何かを公開すると、それぞれに1,000人のフォロワーがいますか? また、発行者の投稿は、すべての電子メールが送信されるまで待機しますか?
分散システムはこの問題を解決します。
この特定の問題は、キューを使用することで解決できます。 投稿の公開を担当するサービスA(プロデューサー)がそれを行います。 投稿を公開し、メールを受信する必要のあるユーザーのリストと投稿自体を含むイベントをプッシュします。 ユーザーのリストはサービスBで取得できますが、この例を簡単にするために、サービスAから送信します。
これは非同期操作です。 これは、公開しているサービスが電子メールの送信を待つ必要がないことを意味します。
サービスB(コンシューマー)は、キューからイベントをプルして処理します。 このようにして、サービスを簡単に拡張でき、 n
人のコンシューマーが電子メール(イベントの処理)を送信できるようになりました。
それでは、プロデューサーのサービスでの実装から始めましょう。 必要な依存関係は次のとおりです。
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
これらの3つのMaven依存関係が必要です。
- JedisはRedisクライアントです。
- Spring Data Redisの依存関係により、JavaでのRedisの使用が容易になります。 コアAPIを使用するためのテンプレートクラスや軽量のリポジトリスタイルのデータアクセスなど、おなじみのSpringの概念を提供します。
- Spring Integration Redisは、Springプログラミングモデルの拡張機能を提供して、よく知られているエンタープライズ統合パターンをサポートします。
次に、Jedisクライアントを構成する必要があります。
@Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }
アノテーション@Value
は、Springがアプリケーションプロパティで定義された値をフィールドに挿入することを意味します。 これは、 redis.host
とredis.port
値をアプリケーションのプロパティで定義する必要があることを意味します。
次に、キューに送信するメッセージを定義する必要があります。 簡単なメッセージ例は次のようになります。
@Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }
注:Project Lombok(https://projectlombok.org/)は、 @Getter
、 @Setter
、 @Builder
、およびその他の多くのアノテーションを提供して、ゲッター、セッター、およびその他の些細なものでコードが乱雑にならないようにします。 あなたはこのToptalの記事からそれについてもっと学ぶことができます。
メッセージ自体はJSON形式でキューに保存されます。 イベントがキューに公開されるたびに、メッセージはJSONにシリアル化されます。 そして、キューから消費するとき、メッセージは逆シリアル化されます。
メッセージを定義したら、キュー自体を定義する必要があります。 Spring Integrationでは、 .xml
構成を介して簡単に実行できます。 構成は、 resources/WEB-INF
ディレクトリー内に配置する必要があります。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>
構成では、「int-redis:queue-outbound-channel-adapter」の部分を確認できます。 そのプロパティは次のとおりです。
- id:コンポーネントのBean名。
- channel:このエンドポイントがメッセージを受信する
MessageChannel
。 - connection-factory:
RedisConnectionFactory
への参照。 - queue: Redisメッセージを送信するためにキューベースのプッシュ操作が実行されるRedisリストの名前。 この属性は、queue-expressionと相互に排他的です。
- queue-expression:実行時に着信メッセージを
#root
変数として使用してRedisリストの名前を決定するSpEL式。 この属性は、キューと相互に排他的です。 - シリアライザー:
RedisSerializer
リファレンス。 デフォルトでは、これはJdkSerializationRedisSerializer
です。 ただし、String
ペイロードの場合、シリアライザー参照が提供されていない場合はStringRedisSerializer
が使用されます。 - extract-payload:このエンドポイントがペイロードのみをRedisキューに送信するか、メッセージ全体を送信するかを指定します。 デフォルト値は
true
です。 - left-push:このエンドポイントがメッセージをRedisリストに書き込むために左プッシュ(
true
)または右プッシュ(false
の場合)のどちらを使用するかを指定します。 trueの場合、デフォルトのRedisキューインバウンドチャネルアダプターで使用すると、RedisリストはFIFOキューとして機能します。 左ポップでリストから読み取るソフトウェアで使用する場合、またはスタックのようなメッセージ順序を実現する場合は、false
に設定します。 デフォルト値はtrue
です。
次のステップは、 .xml
構成に記載されているゲートウェイを定義することです。 ゲートウェイには、 org.toptal.queue
パッケージのRedisChannelGateway
クラスを使用しています。

StringRedisSerializer
は、Redisに保存する前にメッセージをシリアル化するために使用されます。 また、 RedisChannelGateway
.xml
ゲートウェイサービスとして設定しました。 これは、 RedisChannelGateway
が他のBeanに注入される可能性があることを意味します。 @Gateway
アノテーションを使用してメソッドごとのチャネル参照を提供することもできるため、プロパティdefault-request-channel
を定義しました。 クラス定義:
public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }
この構成をアプリケーションに接続するには、インポートする必要があります。 これは、 SpringIntegrationConfig
クラスに実装されています。
@ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }
@ImportResource
アノテーションは、 .xml
構成ファイルを@Configuration
にインポートするために使用されます。 また、 @AutoConfigureAfter
アノテーションは、他の指定された自動構成クラスの後に自動構成を適用する必要があることを示唆するために使用されます。
次に、サービスを作成し、イベントをRedisキューにenqueue
するメソッドを実装します。
public interface QueueService { void enqueue(PostPublishedEvent event); }
@Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }
そして今、 QueueService
のenqueue
メソッドを使用してキューにメッセージを簡単に送信できます。
Redisキューは、1つ以上のプロデューサーとコンシューマーのリストです。 メッセージをキューに公開するには、プロデューサーはLPUSH
コマンドを使用します。 また、Redisを監視する場合(ヒント: redis-cli monitor
入力)、メッセージがキューに追加されていることがわかります。
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"
次に、これらのイベントをキューからプルして処理するコンシューマーアプリケーションを作成する必要があります。 コンシューマーサービスには、プロデューサーサービスと同じ依存関係が必要です。
これで、 PostPublishedEvent
クラスを再利用してメッセージを逆シリアル化できます。
キュー構成を作成する必要があります。これも、 resources/WEB-INF
ディレクトリ内に配置する必要があります。 キュー設定の内容は次のとおりです。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>
.xml
構成では、 int-redis:queue-inbound-channel-adapter
は次のプロパティを持つことができます。
- id:コンポーネントのBean名。
- channel:このエンドポイントからメッセージを送信する
MessageChannel
。 - auto-startup:アプリケーションコンテキストの開始後にこのエンドポイントを自動的に開始するかどうかを指定する
SmartLifecycle
属性。 デフォルト値はtrue
です。 - フェーズ:このエンドポイントが開始されるフェーズを指定する
SmartLifecycle
属性。 デフォルト値は0
です。 - connection-factory:
RedisConnectionFactory
への参照。 - queue: Redisメッセージを取得するためにキューベースのポップ操作が実行されるRedisリストの名前。
- error-channel:
Endpoint
のリスニングタスクからExceptions
付きのErrorMessages
を送信するMessageChannel
。 デフォルトでは、基礎となるMessagePublishingErrorHandler
は、アプリケーションコンテキストのデフォルトのerrorChannel
を使用します。 - シリアライザー:
RedisSerializer
リファレンス。 空の文字列にすることができます。これは、シリアライザーがないことを意味します。 この場合、インバウンドRedisメッセージからの生のbyte[]
は、Message
ペイロードとしてチャネルに送信されます。 デフォルトでは、これはJdkSerializationRedisSerializer
です。 - receive-timeout: pop操作がキューからのRedisメッセージを待機するためのミリ秒単位のタイムアウト。 デフォルト値は1秒です。
- Recovery-interval:ポップ操作の例外の後、リスナータスクを再起動する前に、リスナータスクがスリープする必要がある時間(ミリ秒単位)。
- expected-message:このエンドポイントがRedisキューからのデータにメッセージ全体が含まれることを期待するかどうかを指定します。 この属性が
true
に設定されている場合、メッセージには何らかの形式の逆シリアル化(デフォルトではJDKシリアル化)が必要なため、シリアライザーを空の文字列にすることはできません。 デフォルト値はfalse
です。 - task-executor: Spring
TaskExecutor
(または標準のJDK 1.5+ Executor)Beanへの参照。 基礎となるリスニングタスクに使用されます。 デフォルトでは、SimpleAsyncTaskExecutor
が使用されます。 - right-pop:このエンドポイントがRedisリストからメッセージを読み取るためにright pop(
true
)またはleft pop(false
の場合)のどちらを使用するかを指定します。true
の場合、デフォルトのRedisキューアウトバウンドチャネルアダプタで使用すると、RedisリストはFIFOキューとして機能します。 右プッシュでリストに書き込むソフトウェアで使用する場合、またはスタックのようなメッセージ順序を実現する場合は、false
に設定します。 デフォルト値はtrue
です。
重要な部分は、イベントの処理に使用するサービスとメソッドを定義する「サービスアクティベーター」です。
また、 json-to-object-transformer
は、JSONをオブジェクトに変換するためにtype属性を必要とします。これは、上記のtype="com.toptal.integration.spring.model.PostPublishedEvent"
に設定されています。
この場合も、この構成を配線するには、 SpringIntegrationConfig
クラスが必要になります。これは、以前と同じにすることができます。 そして最後に、実際にイベントを処理するサービスが必要です。
public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }
アプリケーションを実行すると、Redisで次のことがわかります。
"BRPOP" "my-event-queue" "1"
結論
Spring IntegrationとRedisを使用すると、Springマイクロサービスアプリケーションの構築は通常ほど困難ではありません。 少しの構成と少量の定型コードで、マイクロサービスアーキテクチャの基盤をすぐに構築できます。
現在のSpringプロジェクトを完全にスクラッチして新しいアーキテクチャに切り替える予定がない場合でも、Redisを使用すると、キューを使用してパフォーマンスを大幅に向上させることが非常に簡単です。