Связь с микросервисами: учебник Spring по интеграции с Redis
Опубликовано: 2022-03-11Микросервисная архитектура — очень популярный подход к разработке и внедрению масштабируемых веб-приложений. Связь внутри монолитного приложения между компонентами обычно основана на вызовах методов или функций внутри одного и того же процесса. С другой стороны, приложение на основе микросервисов представляет собой распределенную систему, работающую на нескольких машинах.
Связь между этими микросервисами важна для стабильной и масштабируемой системы. Есть несколько способов сделать это. Коммуникация на основе сообщений — один из надежных способов сделать это.
При использовании обмена сообщениями компоненты взаимодействуют друг с другом путем асинхронного обмена сообщениями. Обмен сообщениями осуществляется по каналам.
Когда служба A хочет связаться с службой B, вместо того, чтобы отправлять ее напрямую, A отправляет ее на определенный канал. Когда служба B хочет прочитать сообщение, она выбирает сообщение из определенного канала сообщений.
В этом руководстве по интеграции Spring вы узнаете, как реализовать обмен сообщениями в приложении Spring с помощью Redis. Вы познакомитесь с примером приложения, в котором одна служба отправляет события в очередь, а другая служба обрабатывает эти события одно за другим.
Весенняя интеграция
Проект Spring Integration расширяет среду Spring, обеспечивая поддержку обмена сообщениями между приложениями на основе Spring или внутри них. Компоненты связаны друг с другом через парадигму обмена сообщениями. Отдельные компоненты могут не знать о других компонентах приложения.
Spring Integration предоставляет широкий выбор механизмов для связи с внешними системами. Канальные адаптеры являются одним из таких механизмов, используемых для односторонней интеграции (отправки или получения). А шлюзы используются для сценариев запроса/ответа (входящего или исходящего).
Apache Camel — широко используемая альтернатива. Интеграция Spring обычно предпочтительнее в существующих сервисах на основе Spring, поскольку она является частью экосистемы Spring.
Редис
Redis — чрезвычайно быстрое хранилище данных в памяти. При желании он также может сохраняться на диск. Он поддерживает различные структуры данных, такие как простые пары ключ-значение, наборы, очереди и т. д.
Использование Redis в качестве очереди значительно упрощает обмен данными между компонентами и горизонтальное масштабирование. Производитель или несколько производителей могут помещать данные в очередь, а потребитель или несколько потребителей могут извлекать данные и обрабатывать событие.
Несколько потребителей не могут использовать одно и то же событие — это гарантирует, что одно событие будет обработано один раз.
Преимущества использования Redis в качестве очереди сообщений:
- Параллельное выполнение дискретных задач неблокирующим образом
- Отличное выступление
- Стабильность
- Простой мониторинг и отладка
- Простота внедрения и использования
Правила:
- Добавление задачи в очередь должно быть быстрее, чем обработка самой задачи.
- Потребление задач должно быть быстрее, чем их создание (а если нет, добавьте больше потребителей).
Интеграция Spring с Redis
Далее показано создание примера приложения, чтобы объяснить, как использовать интеграцию Spring с Redis.
Допустим, у вас есть приложение, которое позволяет пользователям публиковать сообщения. И вы хотите создать функцию отслеживания. Еще одно требование заключается в том, что каждый раз, когда кто-то публикует сообщение, все подписчики должны быть уведомлены через какой-либо канал связи (например, по электронной почте или push-уведомлению).
Один из способов реализовать это — отправить электронное письмо каждому подписчику, как только пользователь что-то опубликует. Но что происходит, когда у пользователя 1000 подписчиков? А когда за 10 секунд что-то публикуют 1000 пользователей, у каждого из которых по 1000 подписчиков? Кроме того, будет ли сообщение издателя ждать, пока не будут отправлены все электронные письма?
Распределенные системы решают эту проблему.
Эта конкретная проблема может быть решена с помощью очереди. Сервис А (производитель), который отвечает за публикацию постов, как раз этим и занимается. Он опубликует сообщение и отправит событие со списком пользователей, которым необходимо получить электронное письмо, и само сообщение. Список пользователей можно получить в сервисе B, но для простоты этого примера мы отправим его из сервиса A.
Это асинхронная операция. Это означает, что службе, которая публикует сообщения, не придется ждать, чтобы отправить электронные письма.
Служба B (потребитель) извлечет событие из очереди и обработает его. Таким образом, мы могли бы легко масштабировать наши сервисы, и у нас могло бы быть n
потребителей, отправляющих электронные письма (обрабатывающих события).
Итак, давайте начнем с реализации в сервисе производителя. Необходимые зависимости:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
Эти три зависимости Maven необходимы:
- Jedis — это клиент Redis.
- Зависимость Spring Data Redis упрощает использование Redis в Java. Он предоставляет знакомые концепции Spring, такие как класс шаблона для использования основного API и упрощенный доступ к данным в стиле репозитория.
- Spring Integration Redis предоставляет расширение модели программирования Spring для поддержки известных шаблонов интеграции предприятия.
Далее нам нужно настроить клиент Jedis:
@Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }
Аннотация @Value
означает, что Spring вставит в поле значение, определенное в свойствах приложения. Это означает, что значения redis.host
и redis.port
должны быть определены в свойствах приложения.
Теперь нам нужно определить сообщение, которое мы хотим отправить в очередь. Простой пример сообщения может выглядеть так:
@Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }
Примечание: Project Lombok (https://projectlombok.org/) предоставляет @Getter
, @Setter
, @Builder
и многие другие аннотации, чтобы не загромождать код геттерами, сеттерами и другими тривиальными вещами. Вы можете узнать больше об этом из этой статьи Toptal.
Само сообщение будет сохранено в формате JSON в очереди. Каждый раз, когда событие публикуется в очереди, сообщение сериализуется в JSON. А при потреблении из очереди сообщение будет десериализовано.
Когда сообщение определено, нам нужно определить саму очередь. В Spring Integration это можно легко сделать с помощью конфигурации .xml
. Конфигурация должна быть помещена в каталог resources/WEB-INF
.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>
В конфигурации вы можете увидеть часть «int-redis:queue-outbound-channel-adapter». Его свойства:
- id: имя компонента компонента.
- канал:
MessageChannel
из которого эта конечная точка получает сообщения. - connection-factory: ссылка на bean-компонент
RedisConnectionFactory
. - очередь: имя списка Redis, в котором выполняется операция push на основе очереди для отправки сообщений Redis. Этот атрибут является взаимоисключающим с выражением очереди.
- выражение-очереди: выражение SpEL для определения имени списка Redis с использованием входящего сообщения во время выполнения в качестве переменной
#root
. Этот атрибут является взаимоисключающим с очередью. - serializer: ссылка на bean-компонент
RedisSerializer
. По умолчанию этоJdkSerializationRedisSerializer
. Однако для полезных данныхString
используетсяStringRedisSerializer
, если не указана ссылка на сериализатор. - Extract-payload: укажите, должна ли эта конечная точка отправлять в очередь Redis только полезные данные или все сообщение. Его значение по умолчанию равно
true
. - левый толчок: укажите, должна ли эта конечная точка использовать левый толчок (когда
true
) или правый толчок (когдаfalse
) для записи сообщений в список Redis. Если задано значение true, список Redis действует как очередь FIFO при использовании с адаптером входящего канала очереди Redis по умолчанию. Установите значениеfalse
для использования с программным обеспечением, которое читает из списка с помощью левого всплывающего окна, или для достижения порядка сообщений, подобного стеку. Его значение по умолчанию равноtrue
.
Следующим шагом является определение шлюза, который указан в конфигурации .xml
. В качестве шлюза мы используем класс RedisChannelGateway
из пакета org.toptal.queue
.

StringRedisSerializer
используется для сериализации сообщения перед сохранением в Redis. Также в конфигурации .xml
мы определили шлюз и установили RedisChannelGateway
в качестве службы шлюза. Это означает, что компонент RedisChannelGateway
можно внедрить в другие компоненты. Мы определили свойство default-request-channel
, потому что с помощью аннотации @Gateway
также можно предоставить ссылки на каналы для каждого метода. Определение класса:
public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }
Чтобы связать эту конфигурацию с нашим приложением, мы должны ее импортировать. Это реализовано в классе SpringIntegrationConfig
.
@ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }
Аннотация @ImportResource
используется для импорта файлов конфигурации Spring .xml
в @Configuration
. А аннотация @AutoConfigureAfter
используется, чтобы намекнуть, что автоконфигурация должна применяться после других указанных классов автоконфигурации.
Теперь мы создадим сервис и реализуем метод, который будет enqueue
события в очередь Redis.
public interface QueueService { void enqueue(PostPublishedEvent event); }
@Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }
И теперь вы можете легко отправить сообщение в очередь, используя метод enqueue
из QueueService
.
Очереди Redis — это просто списки с одним или несколькими производителями и потребителями. Чтобы опубликовать сообщение в очереди, производители используют команду Redis LPUSH
. И если вы отслеживаете Redis (подсказка: введите redis-cli monitor
), вы увидите, что сообщение добавляется в очередь:
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"
Теперь нам нужно создать потребительское приложение, которое будет извлекать эти события из очереди и обрабатывать их. Службе-потребителю нужны те же зависимости, что и службе-производителю.
Теперь мы можем повторно использовать класс PostPublishedEvent
для десериализации сообщений.
Нам нужно создать конфигурацию очереди, и, опять же, ее нужно поместить в каталог resources/WEB-INF
. Содержимое конфигурации очереди:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>
В конфигурации .xml
int-redis:queue-inbound-channel-adapter
может иметь следующие свойства:
- id: имя компонента компонента.
- канал:
MessageChannel
, на который мы отправляем сообщения из этой конечной точки. - auto-startup: атрибут
SmartLifecycle
, указывающий, должна ли эта конечная точка запускаться автоматически после запуска контекста приложения или нет. Его значение по умолчанию равноtrue
. - фаза: атрибут
SmartLifecycle
для указания фазы, в которой будет запущена эта конечная точка. Его значение по умолчанию равно0
. - connection-factory: ссылка на bean-компонент
RedisConnectionFactory
. - очередь: имя списка Redis, в котором выполняется всплывающая операция на основе очереди для получения сообщений Redis.
- error-channel:
MessageChannel
, на который мы будем отправлятьErrorMessages
сExceptions
из задачи прослушиванияEndpoint
. По умолчанию базовыйMessagePublishingErrorHandler
используетerrorChannel
по умолчанию из контекста приложения. - serializer: ссылка на bean-компонент
RedisSerializer
. Это может быть пустая строка, что означает отсутствие сериализатора. В этом случае необработанныйbyte[]
из входящего сообщения Redis отправляется в канал в качестве полезной нагрузкиMessage
. По умолчанию этоJdkSerializationRedisSerializer
. - тайм-аут приема: тайм-аут в миллисекундах, по истечении которого операция всплывающего окна ожидает сообщения Redis из очереди. Его значение по умолчанию равно 1 секунде.
- интервал восстановления: время в миллисекундах, в течение которого задача прослушивателя должна находиться в спящем режиме после исключений в операции извлечения перед перезапуском задачи прослушивателя.
- ожидаемое сообщение: укажите, ожидает ли эта конечная точка, что данные из очереди Redis будут содержать целые сообщения. Если для этого атрибута задано значение
true
, сериализатор не может быть пустой строкой, поскольку сообщения требуют некоторой формы десериализации (сериализация JDK по умолчанию). Его значение по умолчанию —false
. - task-executor: ссылка на bean-компонент Spring
TaskExecutor
(или стандартный JDK 1.5+ Executor). Он используется для основной задачи прослушивания. По умолчанию используетсяSimpleAsyncTaskExecutor
. - right-pop: укажите, должна ли эта конечная точка использовать правое всплывающее окно (когда
true
) или левое всплывающее окно (когдаfalse
) для чтения сообщений из списка Redis. Если задано значениеtrue
, список Redis действует как очередь FIFO при использовании с адаптером исходящего канала очереди Redis по умолчанию. Установите значениеfalse
для использования с программным обеспечением, которое записывает в список с правым нажатием, или для достижения порядка сообщений, подобного стеку. Его значение по умолчанию равноtrue
.
Важной частью является «активатор сервиса», который определяет, какой сервис и метод следует использовать для обработки события».
Кроме того, json-to-object-transformer
требуется атрибут type для преобразования JSON в объекты, установленный выше для type="com.toptal.integration.spring.model.PostPublishedEvent"
.
Опять же, для подключения этой конфигурации нам понадобится класс SpringIntegrationConfig
, который может быть таким же, как и раньше. И, наконец, нам нужен сервис, который действительно будет обрабатывать событие.
public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }
Запустив приложение, вы увидите в Redis:
"BRPOP" "my-event-queue" "1"
Заключение
С Spring Integration и Redis создание приложения микросервисов Spring не так сложно, как обычно. С небольшой настройкой и небольшим количеством стандартного кода вы сможете быстро создать основу своей микросервисной архитектуры.
Даже если вы не планируете полностью стирать свой текущий проект Spring и переходить на новую архитектуру, с помощью Redis очень просто добиться значительного повышения производительности с помощью очередей.