微服務通信:使用 Redis 的 Spring 集成教程
已發表: 2022-03-11微服務架構是設計和實現高度可擴展的 Web 應用程序的一種非常流行的方法。 組件之間的單體應用程序內的通信通常基於同一進程內的方法或函數調用。 另一方面,基於微服務的應用程序是在多台機器上運行的分佈式系統。
這些微服務之間的通信對於擁有穩定且可擴展的系統非常重要。 有多種方法可以做到這一點。 基於消息的通信是可靠地做到這一點的一種方法。
使用消息傳遞時,組件之間通過異步交換消息進行交互。 消息通過渠道交換。
當服務 A 想要與服務 B 通信時,A 不會直接發送,而是將其發送到特定的通道。 當服務 B 想要讀取消息時,它會從特定的消息通道中獲取消息。
在這個 Spring 集成教程中,您將學習如何使用 Redis 在 Spring 應用程序中實現消息傳遞。 您將看到一個示例應用程序,其中一項服務正在將事件推送到隊列中,而另一項服務正在逐一處理這些事件。
彈簧集成
Spring Integration 項目擴展了 Spring 框架,為基於 Spring 的應用程序之間或內部的消息傳遞提供支持。 組件通過消息傳遞範式連接在一起。 單個組件可能不知道應用程序中的其他組件。
Spring Integration 提供了多種與外部系統通信的機制。 通道適配器是一種用於單向集成(發送或接收)的機制。 網關用於請求/回复場景(入站或出站)。
Apache Camel 是一種被廣泛使用的替代方案。 Spring 集成在現有的基於 Spring 的服務中通常是首選,因為它是 Spring 生態系統的一部分。
雷迪斯
Redis 是一種速度極快的內存數據存儲。 它也可以選擇持久化到磁盤。 它支持不同的數據結構,如簡單的鍵值對、集合、隊列等。
使用 Redis 作為隊列可以更輕鬆地在組件之間共享數據和水平擴展。 一個生產者或多個生產者可以將數據推送到隊列中,一個消費者或多個消費者可以拉取數據並處理事件。
多個消費者不能消費同一個事件——這確保了一個事件被處理一次。
使用 Redis 作為消息隊列的好處:
- 以非阻塞方式並行執行離散任務
- 很棒的演出
- 穩定
- 易於監控和調試
- 易於實施和使用
規則:
- 將任務添加到隊列應該比處理任務本身更快。
- 消費任務應該比生產任務更快(如果不是,請添加更多消費者)。
與 Redis 的 Spring 集成
下面通過創建一個示例應用程序來解釋如何使用 Spring Integration with Redis。
假設您有一個允許用戶發布帖子的應用程序。 你想建立一個跟隨功能。 另一個要求是,每次有人發布帖子時,都應通過某種通信渠道(例如,電子郵件或推送通知)通知所有關注者。
實現這一點的一種方法是在用戶發佈內容後向每個關注者發送電子郵件。 但是當用戶有 1000 個關注者時會發生什麼? 而當 1000 名用戶在 10 秒內發佈內容時,每個用戶都有 1000 名粉絲? 另外,發布者的帖子會等到所有電子郵件都發送完畢嗎?
分佈式系統解決了這個問題。
這個特定的問題可以通過使用隊列來解決。 負責發布帖子的服務 A(生產者)將這樣做。 它將發布一個帖子並推送一個包含需要接收電子郵件的用戶列表和帖子本身的事件。 用戶列表可以在服務 B 中獲取,但為了簡單起見,我們將從服務 A 發送它。
這是一個異步操作。 這意味著正在發布的服務不必等待發送電子郵件。
服務 B(消費者)將從隊列中拉出事件並進行處理。 這樣,我們可以輕鬆擴展我們的服務,並且我們可以讓n
消費者發送電子郵件(處理事件)。
因此,讓我們從生產者服務中的實現開始。 必要的依賴是:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
這三個 Maven 依賴項是必需的:
- Jedis 是一個 Redis 客戶端。
- Spring Data Redis 依賴使得在 Java 中使用 Redis 變得更加容易。 它提供了熟悉的 Spring 概念,例如用於核心 API 使用的模板類和輕量級存儲庫式數據訪問。
- Spring 集成 Redis 提供了 Spring 編程模型的擴展,以支持著名的企業集成模式。
接下來,我們需要配置 Jedis 客戶端:
@Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }
註解@Value
意味著 Spring 會將應用程序屬性中定義的值注入到字段中。 這意味著redis.host
和redis.port
值應該在應用程序屬性中定義。
現在,我們需要定義要發送到隊列的消息。 一個簡單的示例消息可能如下所示:
@Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }
注意:Lombok 項目 (https://projectlombok.org/) 提供了@Getter
、 @Setter 、 @Setter
和許多其他註釋,以避免將代碼與 getter、setter 和其他瑣碎的東西@Builder
。 您可以從這篇 Toptal 文章中了解更多信息。
消息本身將以 JSON 格式保存在隊列中。 每次將事件發佈到隊列時,消息都會被序列化為 JSON。 當從隊列中消費時,消息將被反序列化。
定義完消息後,我們需要定義隊列本身。 在 Spring Integration 中,可以通過.xml
配置輕鬆完成。 配置應放在resources/WEB-INF
目錄中。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>
在配置中,您可以看到“int-redis:queue-outbound-channel-adapter”部分。 它的屬性是:
- id:組件的bean名稱。
- channel:此端點從中接收消息的
MessageChannel
。 - connection-factory:對
RedisConnectionFactory
bean 的引用。 - queue:基於隊列的push操作發送Redis消息的Redis列表名稱。 該屬性與 queue-expression 互斥。
- queue-expression:一個 SpEL 表達式,用於使用運行時傳入的消息作為
#root
變量來確定 Redis 列表的名稱。 該屬性與隊列互斥。 - 序列化器:一個
RedisSerializer
bean 引用。 默認情況下,它是一個JdkSerializationRedisSerializer
。 但是,對於String
有效負載,如果未提供序列化程序引用,則使用StringRedisSerializer
。 - extract-payload:指定此端點是否應僅將有效負載發送到 Redis 隊列或整個消息。 它的默認值為
true
。 - left-push:指定此端點應該使用左推(當
true
時)還是右推(當false
時)將消息寫入 Redis 列表。 如果為 true,則 Redis 列表在與默認 Redis 隊列入站通道適配器一起使用時充當 FIFO 隊列。 設置為false
以與從左彈出列表中讀取的軟件一起使用或實現類似堆棧的消息順序。 它的默認值為true
。
下一步是定義網關,在.xml
配置中提到。 對於網關,我們使用org.toptal.queue
包中的RedisChannelGateway
類。

StringRedisSerializer
用於在保存到 Redis 之前對消息進行序列化。 同樣在.xml
配置中,我們定義了網關並將RedisChannelGateway
設置為網關服務。 這意味著可以將RedisChannelGateway
bean 注入到其他 bean 中。 我們定義了default-request-channel
屬性,因為它還可以通過使用@Gateway
註釋來提供每個方法的通道引用。 類定義:
public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }
要將這個配置連接到我們的應用程序中,我們必須導入它。 這是在SpringIntegrationConfig
類中實現的。
@ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }
@ImportResource
註解用於將 Spring .xml
配置文件導入@Configuration
。 @AutoConfigureAfter
註解用於提示自動配置應該在其他指定的自動配置類之後應用。
我們現在將創建一個服務並實現將事件enqueue
Redis 隊列的方法。
public interface QueueService { void enqueue(PostPublishedEvent event); }
@Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }
現在,您可以使用QueueService
中的enqueue
方法輕鬆地將消息發送到隊列。
Redis 隊列只是包含一個或多個生產者和消費者的列表。 要將消息發佈到隊列,生產者使用LPUSH
Redis 命令。 如果您監控 Redis(提示:輸入redis-cli monitor
),您可以看到消息已添加到隊列中:
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"
現在,我們需要創建一個消費者應用程序,它將從隊列中提取這些事件並進行處理。 消費者服務需要與生產者服務相同的依賴關係。
現在我們可以重用PostPublishedEvent
類來反序列化消息。
我們需要創建隊列配置,同樣,它必須放在resources/WEB-INF
目錄中。 隊列配置的內容是:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>
在.xml
配置中, int-redis:queue-inbound-channel-adapter
可以具有以下屬性:
- id:組件的bean名稱。
- channel:我們從這個端點發送消息的
MessageChannel
。 - auto-startup:一個
SmartLifecycle
屬性,用於指定此端點是否應在應用程序上下文啟動後自動啟動。 它的默認值為true
。 - 階段:一個
SmartLifecycle
屬性,用於指定此端點將在哪個階段啟動。 它的默認值為0
。 - connection-factory:對
RedisConnectionFactory
bean 的引用。 - queue:基於隊列的pop操作獲取Redis消息的Redis列表的名稱。
- error-channel:我們將從
Endpoint
的偵聽任務向其發送帶有Exceptions
的ErrorMessages
的MessageChannel
。 默認情況下,底層MessagePublishingErrorHandler
使用應用程序上下文中的默認errorChannel
。 - 序列化器:
RedisSerializer
bean 引用。 它可以是一個空字符串,這意味著沒有序列化程序。 在這種情況下,來自入站 Redis 消息的原始byte[]
將作為Message
負載發送到通道。 默認情況下,它是一個JdkSerializationRedisSerializer
。 - receive-timeout: pop 操作等待隊列中的 Redis 消息的超時時間(以毫秒為單位)。 其默認值為 1 秒。
- recovery-interval:在重新啟動偵聽器任務之前,在彈出操作異常後偵聽器任務應休眠的時間(以毫秒為單位)。
- 期望消息:指定此端點是否期望來自 Redis 隊列的數據包含整個消息。 如果此屬性設置為
true
,則序列化程序不能為空字符串,因為消息需要某種形式的反序列化(JDK 默認為序列化)。 它的默認值為false
。 - task-executor:對 Spring
TaskExecutor
(或標準 JDK 1.5+ Executor)bean 的引用。 它用於底層監聽任務。 默認情況下,使用SimpleAsyncTaskExecutor
。 - right-pop:指定該端點應該使用右彈出(當
true
時)還是左彈出(當false
時)從 Redis 列表中讀取消息。 如果為true
,則 Redis 列表在與默認 Redis 隊列出站通道適配器一起使用時充當 FIFO 隊列。 設置為false
以與通過右推寫入列表的軟件一起使用或實現類似堆棧的消息順序。 它的默認值為true
。
重要的部分是“服務激活器”,它定義了應該使用哪個服務和方法來處理事件。
此外, json-to-object-transformer
需要一個 type 屬性才能將 JSON 轉換為對象,上面設置為type="com.toptal.integration.spring.model.PostPublishedEvent"
。
同樣,要連接這個配置,我們需要SpringIntegrationConfig
類,它可以和以前一樣。 最後,我們需要一個能夠實際處理事件的服務。
public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }
運行應用程序後,您可以在 Redis 中看到:
"BRPOP" "my-event-queue" "1"
結論
使用 Spring Integration 和 Redis,構建 Spring 微服務應用程序並不像通常那樣令人生畏。 只需少量配置和少量樣板代碼,您就可以立即構建微服務架構的基礎。
即使您不打算完全從頭開始當前的 Spring 項目並切換到新的架構,在 Redis 的幫助下,使用隊列獲得巨大的性能改進也是非常簡單的。