微服务通信:使用 Redis 的 Spring 集成教程
已发表: 2022-03-11微服务架构是设计和实现高度可扩展的 Web 应用程序的一种非常流行的方法。 组件之间的单体应用程序内的通信通常基于同一进程内的方法或函数调用。 另一方面,基于微服务的应用程序是在多台机器上运行的分布式系统。
这些微服务之间的通信对于拥有稳定且可扩展的系统非常重要。 有多种方法可以做到这一点。 基于消息的通信是可靠地做到这一点的一种方法。
使用消息传递时,组件之间通过异步交换消息进行交互。 消息通过渠道交换。
当服务 A 想要与服务 B 通信时,A 不会直接发送,而是将其发送到特定的通道。 当服务 B 想要读取消息时,它会从特定的消息通道中获取消息。
在这个 Spring 集成教程中,您将学习如何使用 Redis 在 Spring 应用程序中实现消息传递。 您将看到一个示例应用程序,其中一项服务正在将事件推送到队列中,而另一项服务正在逐一处理这些事件。
弹簧集成
Spring Integration 项目扩展了 Spring 框架,为基于 Spring 的应用程序之间或内部的消息传递提供支持。 组件通过消息传递范式连接在一起。 单个组件可能不知道应用程序中的其他组件。
Spring Integration 提供了多种与外部系统通信的机制。 通道适配器是一种用于单向集成(发送或接收)的机制。 网关用于请求/回复场景(入站或出站)。
Apache Camel 是一种被广泛使用的替代方案。 Spring 集成在现有的基于 Spring 的服务中通常是首选,因为它是 Spring 生态系统的一部分。
雷迪斯
Redis 是一种速度极快的内存数据存储。 它也可以选择持久化到磁盘。 它支持不同的数据结构,如简单的键值对、集合、队列等。
使用 Redis 作为队列可以更轻松地在组件之间共享数据和水平扩展。 一个生产者或多个生产者可以将数据推送到队列中,一个消费者或多个消费者可以拉取数据并处理事件。
多个消费者不能消费同一个事件——这确保了一个事件被处理一次。
使用 Redis 作为消息队列的好处:
- 以非阻塞方式并行执行离散任务
- 很棒的演出
- 稳定
- 易于监控和调试
- 易于实施和使用
规则:
- 将任务添加到队列应该比处理任务本身更快。
- 消费任务应该比生产任务更快(如果不是,请添加更多消费者)。
与 Redis 的 Spring 集成
下面通过创建一个示例应用程序来解释如何使用 Spring Integration with Redis。
假设您有一个允许用户发布帖子的应用程序。 你想建立一个跟随功能。 另一个要求是,每次有人发布帖子时,都应通过某种通信渠道(例如,电子邮件或推送通知)通知所有关注者。
实现这一点的一种方法是在用户发布内容后向每个关注者发送电子邮件。 但是当用户有 1000 个关注者时会发生什么? 而当 1000 名用户在 10 秒内发布内容时,每个用户都有 1000 名粉丝? 另外,发布者的帖子会等到所有电子邮件都发送完毕吗?
分布式系统解决了这个问题。
这个特定的问题可以通过使用队列来解决。 负责发布帖子的服务 A(生产者)将这样做。 它将发布一个帖子并推送一个包含需要接收电子邮件的用户列表和帖子本身的事件。 用户列表可以在服务 B 中获取,但为了简单起见,我们将从服务 A 发送它。
这是一个异步操作。 这意味着正在发布的服务不必等待发送电子邮件。
服务 B(消费者)将从队列中拉出事件并进行处理。 这样,我们可以轻松扩展我们的服务,并且我们可以让n
消费者发送电子邮件(处理事件)。
因此,让我们从生产者服务中的实现开始。 必要的依赖是:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
这三个 Maven 依赖项是必需的:
- Jedis 是一个 Redis 客户端。
- Spring Data Redis 依赖使得在 Java 中使用 Redis 变得更加容易。 它提供了熟悉的 Spring 概念,例如用于核心 API 使用的模板类和轻量级存储库式数据访问。
- Spring 集成 Redis 提供了 Spring 编程模型的扩展,以支持著名的企业集成模式。
接下来,我们需要配置 Jedis 客户端:
@Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }
注解@Value
意味着 Spring 会将应用程序属性中定义的值注入到字段中。 这意味着redis.host
和redis.port
值应该在应用程序属性中定义。
现在,我们需要定义要发送到队列的消息。 一个简单的示例消息可能如下所示:
@Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }
注意:Lombok 项目 (https://projectlombok.org/) 提供了@Getter
、 @Setter 、 @Setter
和许多其他注释,以避免将代码与 getter、setter 和其他琐碎的东西@Builder
。 您可以从这篇 Toptal 文章中了解更多信息。
消息本身将以 JSON 格式保存在队列中。 每次将事件发布到队列时,消息都会被序列化为 JSON。 当从队列中消费时,消息将被反序列化。
定义完消息后,我们需要定义队列本身。 在 Spring Integration 中,可以通过.xml
配置轻松完成。 配置应放在resources/WEB-INF
目录中。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>
在配置中,您可以看到“int-redis:queue-outbound-channel-adapter”部分。 它的属性是:
- id:组件的bean名称。
- channel:此端点从中接收消息的
MessageChannel
。 - connection-factory:对
RedisConnectionFactory
bean 的引用。 - queue:基于队列的push操作发送Redis消息的Redis列表名称。 该属性与 queue-expression 互斥。
- queue-expression:一个 SpEL 表达式,用于使用运行时传入的消息作为
#root
变量来确定 Redis 列表的名称。 该属性与队列互斥。 - 序列化器:一个
RedisSerializer
bean 引用。 默认情况下,它是一个JdkSerializationRedisSerializer
。 但是,对于String
有效负载,如果未提供序列化程序引用,则使用StringRedisSerializer
。 - extract-payload:指定此端点是否应仅将有效负载发送到 Redis 队列或整个消息。 它的默认值为
true
。 - left-push:指定此端点应该使用左推(当
true
时)还是右推(当false
时)将消息写入 Redis 列表。 如果为 true,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当 FIFO 队列。 设置为false
以与从左弹出列表中读取的软件一起使用或实现类似堆栈的消息顺序。 它的默认值为true
。
下一步是定义网关,在.xml
配置中提到。 对于网关,我们使用org.toptal.queue
包中的RedisChannelGateway
类。

StringRedisSerializer
用于在保存到 Redis 之前对消息进行序列化。 同样在.xml
配置中,我们定义了网关并将RedisChannelGateway
设置为网关服务。 这意味着可以将RedisChannelGateway
bean 注入到其他 bean 中。 我们定义了default-request-channel
属性,因为它还可以通过使用@Gateway
注释来提供每个方法的通道引用。 类定义:
public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }
要将这个配置连接到我们的应用程序中,我们必须导入它。 这是在SpringIntegrationConfig
类中实现的。
@ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }
@ImportResource
注解用于将 Spring .xml
配置文件导入@Configuration
。 @AutoConfigureAfter
注解用于提示自动配置应该在其他指定的自动配置类之后应用。
我们现在将创建一个服务并实现将事件enqueue
Redis 队列的方法。
public interface QueueService { void enqueue(PostPublishedEvent event); }
@Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }
现在,您可以使用QueueService
中的enqueue
方法轻松地将消息发送到队列。
Redis 队列只是包含一个或多个生产者和消费者的列表。 要将消息发布到队列,生产者使用LPUSH
Redis 命令。 如果您监控 Redis(提示:输入redis-cli monitor
),您可以看到消息已添加到队列中:
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"
现在,我们需要创建一个消费者应用程序,它将从队列中提取这些事件并进行处理。 消费者服务需要与生产者服务相同的依赖关系。
现在我们可以重用PostPublishedEvent
类来反序列化消息。
我们需要创建队列配置,同样,它必须放在resources/WEB-INF
目录中。 队列配置的内容是:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>
在.xml
配置中, int-redis:queue-inbound-channel-adapter
可以具有以下属性:
- id:组件的bean名称。
- channel:我们从这个端点发送消息的
MessageChannel
。 - auto-startup:一个
SmartLifecycle
属性,用于指定此端点是否应在应用程序上下文启动后自动启动。 它的默认值为true
。 - 阶段:一个
SmartLifecycle
属性,用于指定此端点将在哪个阶段启动。 它的默认值为0
。 - connection-factory:对
RedisConnectionFactory
bean 的引用。 - queue:基于队列的pop操作获取Redis消息的Redis列表的名称。
- error-channel:我们将从
Endpoint
的侦听任务向其发送带有Exceptions
的ErrorMessages
的MessageChannel
。 默认情况下,底层MessagePublishingErrorHandler
使用应用程序上下文中的默认errorChannel
。 - 序列化器:
RedisSerializer
bean 引用。 它可以是一个空字符串,这意味着没有序列化程序。 在这种情况下,来自入站 Redis 消息的原始byte[]
将作为Message
负载发送到通道。 默认情况下,它是一个JdkSerializationRedisSerializer
。 - receive-timeout: pop 操作等待队列中的 Redis 消息的超时时间(以毫秒为单位)。 其默认值为 1 秒。
- recovery-interval:在重新启动侦听器任务之前,在弹出操作异常后侦听器任务应休眠的时间(以毫秒为单位)。
- 期望消息:指定此端点是否期望来自 Redis 队列的数据包含整个消息。 如果此属性设置为
true
,则序列化程序不能为空字符串,因为消息需要某种形式的反序列化(JDK 默认为序列化)。 它的默认值为false
。 - task-executor:对 Spring
TaskExecutor
(或标准 JDK 1.5+ Executor)bean 的引用。 它用于底层监听任务。 默认情况下,使用SimpleAsyncTaskExecutor
。 - right-pop:指定该端点应该使用右弹出(当
true
时)还是左弹出(当false
时)从 Redis 列表中读取消息。 如果为true
,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当 FIFO 队列。 设置为false
以与通过右推写入列表的软件一起使用或实现类似堆栈的消息顺序。 它的默认值为true
。
重要的部分是“服务激活器”,它定义了应该使用哪个服务和方法来处理事件。
此外, json-to-object-transformer
需要一个 type 属性才能将 JSON 转换为对象,上面设置为type="com.toptal.integration.spring.model.PostPublishedEvent"
。
同样,要连接这个配置,我们需要SpringIntegrationConfig
类,它可以和以前一样。 最后,我们需要一个能够实际处理事件的服务。
public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }
运行应用程序后,您可以在 Redis 中看到:
"BRPOP" "my-event-queue" "1"
结论
使用 Spring Integration 和 Redis,构建 Spring 微服务应用程序并不像通常那样令人生畏。 只需少量配置和少量样板代码,您就可以立即构建微服务架构的基础。
即使您不打算完全从头开始当前的 Spring 项目并切换到新的架构,在 Redis 的帮助下,使用队列获得巨大的性能改进也是非常简单的。