Komunikacja mikroserwisowa: samouczek integracji Spring z Redis
Opublikowany: 2022-03-11Architektura mikroserwisowa to bardzo popularne podejście w projektowaniu i wdrażaniu wysoce skalowalnych aplikacji internetowych. Komunikacja w ramach aplikacji monolitycznej między komponentami jest zwykle oparta na wywołaniach metod lub funkcji w ramach tego samego procesu. Z drugiej strony aplikacja oparta na mikrousługach to rozproszony system działający na wielu komputerach.
Komunikacja między tymi mikrousługami jest ważna, aby mieć stabilny i skalowalny system. Można to zrobić na wiele sposobów. Komunikacja oparta na wiadomościach jest jednym ze sposobów, aby to zrobić niezawodnie.
Podczas korzystania z komunikatów komponenty współdziałają ze sobą poprzez asynchroniczną wymianę komunikatów. Wiadomości są wymieniane kanałami.
Gdy Usługa A chce komunikować się z Usługą B, zamiast wysyłać ją bezpośrednio, A wysyła ją do określonego kanału. Gdy Usługa B chce przeczytać wiadomość, odbiera ją z określonego kanału wiadomości.
W tym samouczku Spring Integration dowiesz się, jak zaimplementować wiadomości w aplikacji Spring przy użyciu Redis. Zostaniesz poprowadzony przez przykładową aplikację, w której jedna usługa wypycha zdarzenia w kolejce, a inna usługa przetwarza te zdarzenia jeden po drugim.
Integracja wiosenna
Projekt Spring Integration rozszerza platformę Spring, zapewniając obsługę przesyłania wiadomości między aplikacjami opartymi na Spring lub w ich obrębie. Komponenty są połączone ze sobą za pomocą paradygmatu przesyłania wiadomości. Poszczególne komponenty mogą nie wiedzieć o innych komponentach w aplikacji.
Spring Integration zapewnia szeroki wybór mechanizmów komunikacji z systemami zewnętrznymi. Adaptery kanałów są jednym z takich mechanizmów stosowanych do integracji jednokierunkowej (wysyłanie lub odbieranie). Bramy są używane w scenariuszach żądań/odpowiedzi (przychodzących lub wychodzących).
Apache Camel to szeroko stosowana alternatywa. Integracja Spring jest zwykle preferowana w istniejących usługach opartych na Spring, ponieważ jest częścią ekosystemu Spring.
Redis
Redis to niezwykle szybki magazyn danych w pamięci. Opcjonalnie może również trwać na dysku. Obsługuje różne struktury danych, takie jak proste pary klucz-wartość, zestawy, kolejki itp.
Używanie Redis jako kolejki znacznie ułatwia udostępnianie danych między komponentami i skalowanie w poziomie. Producent lub wielu producentów może wypchnąć dane do kolejki, a konsument lub wielu konsumentów może pobrać dane i przetworzyć zdarzenie.
Wielu konsumentów nie może korzystać z tego samego zdarzenia — dzięki temu jedno zdarzenie jest przetwarzane raz.
Korzyści z używania Redis jako kolejki wiadomości:
- Równoległe wykonywanie dyskretnych zadań w sposób nieblokujący
- Świetny występ
- Stabilność
- Łatwe monitorowanie i debugowanie
- Łatwa implementacja i użytkowanie
Zasady:
- Dodanie zadania do kolejki powinno być szybsze niż przetwarzanie samego zadania.
- Konsumpcja zadań powinna być szybsza niż ich produkcja (a jeśli nie, dodaj więcej konsumentów).
Wiosenna integracja z Redis
Poniżej przedstawiamy tworzenie przykładowej aplikacji, aby wyjaśnić, jak korzystać ze Spring Integration z Redis.
Załóżmy, że masz aplikację, która umożliwia użytkownikom publikowanie postów. I chcesz zbudować funkcję śledzenia. Kolejnym wymogiem jest to, aby za każdym razem, gdy ktoś publikuje post, wszyscy obserwujący powinni być powiadamiani za pośrednictwem jakiegoś kanału komunikacji (np. e-mail lub powiadomienie push).
Jednym ze sposobów na zaimplementowanie tego jest wysłanie wiadomości e-mail do każdego obserwującego, gdy użytkownik coś opublikuje. Ale co się dzieje, gdy użytkownik ma 1000 obserwujących? A kiedy 1000 użytkowników opublikuje coś w 10 sekund, z których każdy ma 1000 obserwujących? Czy post wydawcy będzie czekać na wysłanie wszystkich e-maili?
Systemy rozproszone rozwiązują ten problem.
Ten konkretny problem można rozwiązać za pomocą kolejki. Serwis A (producent), który jest odpowiedzialny za publikowanie postów, właśnie to zrobi. Opublikuje post i wypchnie wydarzenie z listą użytkowników, którzy muszą otrzymać e-mail i sam post. Listę użytkowników można by pobrać w usłudze B, ale dla uproszczenia tego przykładu wyślemy ją z usługi A.
To jest operacja asynchroniczna. Oznacza to, że publikująca usługa nie będzie musiała czekać na wysłanie wiadomości e-mail.
Usługa B (konsument) wyciągnie zdarzenie z kolejki i przetworzy je. W ten sposób moglibyśmy łatwo skalować nasze usługi i moglibyśmy mieć n
konsumentów wysyłających e-maile (zdarzenia przetwarzania).
Zacznijmy więc od wdrożenia w serwisie producenta. Niezbędne zależności to:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
Te trzy zależności Mavena są niezbędne:
- Jedis jest klientem Redis.
- Zależność Spring Data Redis ułatwia korzystanie z Redis w Javie. Zapewnia znajome koncepcje Spring, takie jak klasa szablonów dla podstawowego użycia interfejsu API i lekki dostęp do danych w stylu repozytorium.
- Spring Integration Redis zapewnia rozszerzenie modelu programowania Spring o obsługę dobrze znanych wzorców integracji przedsiębiorstw.
Następnie musimy skonfigurować klienta Jedis:
@Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }
Adnotacja @Value
oznacza, że Spring wprowadzi do pola wartość zdefiniowaną we właściwościach aplikacji. Oznacza to, że wartości redis.host
i redis.port
powinny być zdefiniowane we właściwościach aplikacji.
Teraz musimy zdefiniować wiadomość, którą chcemy wysłać do kolejki. Prosta przykładowa wiadomość może wyglądać tak:
@Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }
Uwaga: Project Lombok (https://projectlombok.org/) udostępnia @Getter
, @Setter
, @Builder
, a także wiele innych adnotacji, aby uniknąć zaśmiecania kodu programami pobierającymi, ustawiającymi i innymi trywialnymi rzeczami. Możesz dowiedzieć się więcej na ten temat z tego artykułu Toptal.
Sama wiadomość zostanie zapisana w kolejce w formacie JSON. Za każdym razem, gdy zdarzenie jest publikowane w kolejce, wiadomość zostanie zserializowana do formatu JSON. A podczas konsumowania z kolejki wiadomość zostanie zdeserializowana.
Po zdefiniowaniu wiadomości musimy zdefiniować samą kolejkę. W Spring Integration można to łatwo zrobić za pomocą konfiguracji .xml
. Konfigurację należy umieścić w katalogu resources/WEB-INF
.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>
W konfiguracji możesz zobaczyć część „int-redis:queue-outbound-channel-adapter”. Jego właściwości to:
- id: nazwa ziarna komponentu.
- kanał:
MessageChannel
, z którego ten punkt końcowy odbiera komunikaty. - connection-factory: odwołanie do ziarna
RedisConnectionFactory
. - kolejka: nazwa listy Redis, na której wykonywana jest operacja wypychania oparta na kolejce w celu wysłania komunikatów Redis. Ten atrybut wzajemnie się wyklucza z wyrażeniem kolejki.
- wyrażenie kolejki: wyrażenie SpEL określające nazwę listy Redis przy użyciu wiadomości przychodzącej w czasie wykonywania jako zmiennej
#root
. Ten atrybut wzajemnie się wyklucza z kolejką. - serializator: odwołanie do ziarna
RedisSerializer
. Domyślnie jest toJdkSerializationRedisSerializer
. Jednak w przypadku ładunkówString
StringRedisSerializer
jest używany, jeśli nie podano odwołania do serializatora. - extract-payload: Określ, czy ten punkt końcowy powinien wysyłać tylko ładunek do kolejki Redis, czy całą wiadomość. Jego domyślną wartością jest
true
. - left-push: Określ, czy ten punkt końcowy powinien używać lewego wypychania (gdy
true
) czy prawego wypychania (gdyfalse
) do zapisywania komunikatów na liście Redis. Jeśli ma wartość true, lista Redis działa jako kolejka FIFO, gdy jest używana z domyślnym adapterem kanału przychodzącego kolejki Redis. Ustaw nafalse
, aby używać oprogramowania, które odczytuje z listy z lewym wyskakującym okienkiem lub aby uzyskać kolejność wiadomości podobną do stosu. Jego domyślną wartością jesttrue
.
Kolejnym krokiem jest zdefiniowanie bramy, o której mowa w konfiguracji .xml
. W przypadku bramy używamy klasy RedisChannelGateway
z pakietu org.toptal.queue
.

StringRedisSerializer
służy do serializacji wiadomości przed zapisaniem w Redis. Również w konfiguracji .xml
zdefiniowaliśmy bramę i ustawiliśmy RedisChannelGateway
jako usługę bramy. Oznacza to, że fasola RedisChannelGateway
może być wstrzykiwana do innych ziaren. Zdefiniowaliśmy właściwość default-request-channel
, ponieważ możliwe jest również zapewnienie odwołań do kanałów według metody za pomocą adnotacji @Gateway
. Definicja klasy:
public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }
Aby podłączyć tę konfigurację do naszej aplikacji, musimy ją zaimportować. Jest to zaimplementowane w klasie SpringIntegrationConfig
.
@ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }
Adnotacja @ImportResource
służy do importowania plików konfiguracyjnych Spring .xml
do @Configuration
. Adnotacja @AutoConfigureAfter
służy do wskazania, że automatyczna konfiguracja powinna zostać zastosowana po innych określonych klasach automatycznej konfiguracji.
Utworzymy teraz usługę i zaimplementujemy metodę, która będzie umieszczać zdarzenia w kolejce enqueue
.
public interface QueueService { void enqueue(PostPublishedEvent event); }
@Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }
A teraz możesz łatwo wysłać wiadomość do kolejki za pomocą metody enqueue
z QueueService
.
Kolejki Redis to po prostu listy z jednym lub kilkoma producentami i konsumentami. Aby opublikować wiadomość do kolejki, producenci używają komendy LPUSH
Redis. A jeśli monitorujesz Redis (podpowiedź: wpisz redis-cli monitor
), możesz zobaczyć, że wiadomość została dodana do kolejki:
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"
Teraz musimy stworzyć aplikację konsumencką, która wyciągnie te zdarzenia z kolejki i przetworzy je. Usługa konsumencka wymaga tych samych zależności, co usługa producenta.
Teraz możemy ponownie użyć klasy PostPublishedEvent
do deserializacji wiadomości.
Musimy stworzyć konfigurację kolejki i ponownie umieścić ją w katalogu resources/WEB-INF
. Zawartość konfiguracji kolejki to:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>
W konfiguracji .xml
int-redis:queue-inbound-channel-adapter
może mieć następujące właściwości:
- id: nazwa ziarna komponentu.
- kanał:
MessageChannel
, do którego wysyłamy wiadomości z tego punktu końcowego. - automatyczne uruchamianie: Atrybut
SmartLifecycle
określający, czy ten punkt końcowy powinien być uruchamiany automatycznie po uruchomieniu kontekstu aplikacji, czy nie. Jego domyślną wartością jesttrue
. - faza: Atrybut
SmartLifecycle
określający fazę, w której ten punkt końcowy zostanie uruchomiony. Jego wartość domyślna to0
. - connection-factory: odwołanie do ziarna
RedisConnectionFactory
. - kolejka: nazwa listy Redis, na której wykonywana jest operacja wyskakująca oparta na kolejce w celu pobrania komunikatów Redis.
- error-channel:
MessageChannel
, do którego wyślemyErrorMessages
zExceptions
od zadania nasłuchiwaniaEndpoint
. Domyślnie bazowyMessagePublishingErrorHandler
używa domyślnegoerrorChannel
z kontekstu aplikacji. - serializator: odwołanie do ziarna
RedisSerializer
. Może to być pusty ciąg, co oznacza brak serializatora. W takim przypadku nieprzetworzonybyte[]
z przychodzącego komunikatu Redis jest wysyłany do kanału jako ładunekMessage
. Domyślnie jest toJdkSerializationRedisSerializer
. - limit czasu odbioru: limit czasu (w milisekundach) oczekiwania operacji pop na komunikat Redis z kolejki. Jego domyślna wartość to 1 sekunda.
- interwał odzyskiwania: czas w milisekundach, przez który zadanie nasłuchiwania powinno uśpić się po wyjątkach operacji pop przed ponownym uruchomieniem zadania nasłuchiwania.
- oczekiwania-wiadomość: Określ, czy ten punkt końcowy oczekuje, że dane z kolejki Redis będą zawierać całe komunikaty. Jeśli ten atrybut jest ustawiony na
true
, serializator nie może być pustym ciągiem, ponieważ komunikaty wymagają jakiejś formy deserializacji (domyślnie serializacji JDK). Jego domyślną wartością jestfalse
. - task-executor: Odwołanie do komponentu Spring
TaskExecutor
(lub standardowego JDK 1.5+ Executor). Służy do podstawowego zadania słuchania. Domyślnie używany jestSimpleAsyncTaskExecutor
. - right-pop: Określ, czy ten punkt końcowy powinien używać prawego pop (gdy
true
) czy lewego pop (gdyfalse
) do odczytywania komunikatów z listy Redis. Jeślitrue
, lista Redis działa jako kolejka FIFO, gdy jest używana z domyślnym adapterem kanału wychodzącego kolejki Redis. Ustaw nafalse
, aby używać go z oprogramowaniem, które zapisuje na liście prawym przyciskiem myszy lub w celu uzyskania kolejności wiadomości podobnej do stosu. Jego domyślną wartością jesttrue
.
Ważną częścią jest „aktywator usługi”, który określa, jaką usługę i metodę należy zastosować do przetworzenia zdarzenia”.
Ponadto json-to-object-transformer
wymaga atrybutu type w celu przekształcenia JSON w obiekty, ustawione powyżej na type="com.toptal.integration.spring.model.PostPublishedEvent"
.
Ponownie, aby połączyć tę konfigurację, będziemy potrzebować klasy SpringIntegrationConfig
, która może być taka sama jak poprzednio. I na koniec potrzebujemy usługi, która faktycznie obsłuży wydarzenie.
public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }
Po uruchomieniu aplikacji możesz zobaczyć w Redis:
"BRPOP" "my-event-queue" "1"
Wniosek
Dzięki Spring Integration i Redis budowanie aplikacji mikrousług Spring nie jest tak trudne, jak zwykle. Dzięki niewielkiej konfiguracji i niewielkiej ilości standardowego kodu możesz w mgnieniu oka zbudować podstawy architektury mikroserwisów.
Nawet jeśli nie planujesz całkowicie zarysować swojego obecnego projektu Spring i przejść na nową architekturę, z pomocą Redis, bardzo łatwo jest uzyskać ogromną poprawę wydajności dzięki kolejkom.