Komunikacja mikroserwisowa: samouczek integracji Spring z Redis
Opublikowany: 2022-03-11Architektura mikroserwisowa to bardzo popularne podejście w projektowaniu i wdrażaniu wysoce skalowalnych aplikacji internetowych. Komunikacja w ramach aplikacji monolitycznej między komponentami jest zwykle oparta na wywołaniach metod lub funkcji w ramach tego samego procesu. Z drugiej strony aplikacja oparta na mikrousługach to rozproszony system działający na wielu komputerach.
Komunikacja między tymi mikrousługami jest ważna, aby mieć stabilny i skalowalny system. Można to zrobić na wiele sposobów. Komunikacja oparta na wiadomościach jest jednym ze sposobów, aby to zrobić niezawodnie.
Podczas korzystania z komunikatów komponenty współdziałają ze sobą poprzez asynchroniczną wymianę komunikatów. Wiadomości są wymieniane kanałami.
Gdy Usługa A chce komunikować się z Usługą B, zamiast wysyłać ją bezpośrednio, A wysyła ją do określonego kanału. Gdy Usługa B chce przeczytać wiadomość, odbiera ją z określonego kanału wiadomości.
W tym samouczku Spring Integration dowiesz się, jak zaimplementować wiadomości w aplikacji Spring przy użyciu Redis. Zostaniesz poprowadzony przez przykładową aplikację, w której jedna usługa wypycha zdarzenia w kolejce, a inna usługa przetwarza te zdarzenia jeden po drugim.
Integracja wiosenna
Projekt Spring Integration rozszerza platformę Spring, zapewniając obsługę przesyłania wiadomości między aplikacjami opartymi na Spring lub w ich obrębie. Komponenty są połączone ze sobą za pomocą paradygmatu przesyłania wiadomości. Poszczególne komponenty mogą nie wiedzieć o innych komponentach w aplikacji.
Spring Integration zapewnia szeroki wybór mechanizmów komunikacji z systemami zewnętrznymi. Adaptery kanałów są jednym z takich mechanizmów stosowanych do integracji jednokierunkowej (wysyłanie lub odbieranie). Bramy są używane w scenariuszach żądań/odpowiedzi (przychodzących lub wychodzących).
Apache Camel to szeroko stosowana alternatywa. Integracja Spring jest zwykle preferowana w istniejących usługach opartych na Spring, ponieważ jest częścią ekosystemu Spring.
Redis
Redis to niezwykle szybki magazyn danych w pamięci. Opcjonalnie może również trwać na dysku. Obsługuje różne struktury danych, takie jak proste pary klucz-wartość, zestawy, kolejki itp.
Używanie Redis jako kolejki znacznie ułatwia udostępnianie danych między komponentami i skalowanie w poziomie. Producent lub wielu producentów może wypchnąć dane do kolejki, a konsument lub wielu konsumentów może pobrać dane i przetworzyć zdarzenie.
Wielu konsumentów nie może korzystać z tego samego zdarzenia — dzięki temu jedno zdarzenie jest przetwarzane raz.
Korzyści z używania Redis jako kolejki wiadomości:
- Równoległe wykonywanie dyskretnych zadań w sposób nieblokujący
- Świetny występ
- Stabilność
- Łatwe monitorowanie i debugowanie
- Łatwa implementacja i użytkowanie
Zasady:
- Dodanie zadania do kolejki powinno być szybsze niż przetwarzanie samego zadania.
- Konsumpcja zadań powinna być szybsza niż ich produkcja (a jeśli nie, dodaj więcej konsumentów).
Wiosenna integracja z Redis
Poniżej przedstawiamy tworzenie przykładowej aplikacji, aby wyjaśnić, jak korzystać ze Spring Integration z Redis.
Załóżmy, że masz aplikację, która umożliwia użytkownikom publikowanie postów. I chcesz zbudować funkcję śledzenia. Kolejnym wymogiem jest to, aby za każdym razem, gdy ktoś publikuje post, wszyscy obserwujący powinni być powiadamiani za pośrednictwem jakiegoś kanału komunikacji (np. e-mail lub powiadomienie push).
Jednym ze sposobów na zaimplementowanie tego jest wysłanie wiadomości e-mail do każdego obserwującego, gdy użytkownik coś opublikuje. Ale co się dzieje, gdy użytkownik ma 1000 obserwujących? A kiedy 1000 użytkowników opublikuje coś w 10 sekund, z których każdy ma 1000 obserwujących? Czy post wydawcy będzie czekać na wysłanie wszystkich e-maili?
Systemy rozproszone rozwiązują ten problem.
Ten konkretny problem można rozwiązać za pomocą kolejki. Serwis A (producent), który jest odpowiedzialny za publikowanie postów, właśnie to zrobi. Opublikuje post i wypchnie wydarzenie z listą użytkowników, którzy muszą otrzymać e-mail i sam post. Listę użytkowników można by pobrać w usłudze B, ale dla uproszczenia tego przykładu wyślemy ją z usługi A.
To jest operacja asynchroniczna. Oznacza to, że publikująca usługa nie będzie musiała czekać na wysłanie wiadomości e-mail.
Usługa B (konsument) wyciągnie zdarzenie z kolejki i przetworzy je. W ten sposób moglibyśmy łatwo skalować nasze usługi i moglibyśmy mieć n konsumentów wysyłających e-maile (zdarzenia przetwarzania).
Zacznijmy więc od wdrożenia w serwisie producenta. Niezbędne zależności to:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>Te trzy zależności Mavena są niezbędne:
- Jedis jest klientem Redis.
- Zależność Spring Data Redis ułatwia korzystanie z Redis w Javie. Zapewnia znajome koncepcje Spring, takie jak klasa szablonów dla podstawowego użycia interfejsu API i lekki dostęp do danych w stylu repozytorium.
- Spring Integration Redis zapewnia rozszerzenie modelu programowania Spring o obsługę dobrze znanych wzorców integracji przedsiębiorstw.
Następnie musimy skonfigurować klienta Jedis:
@Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } } Adnotacja @Value oznacza, że Spring wprowadzi do pola wartość zdefiniowaną we właściwościach aplikacji. Oznacza to, że wartości redis.host i redis.port powinny być zdefiniowane we właściwościach aplikacji.
Teraz musimy zdefiniować wiadomość, którą chcemy wysłać do kolejki. Prosta przykładowa wiadomość może wyglądać tak:
@Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; } Uwaga: Project Lombok (https://projectlombok.org/) udostępnia @Getter , @Setter , @Builder , a także wiele innych adnotacji, aby uniknąć zaśmiecania kodu programami pobierającymi, ustawiającymi i innymi trywialnymi rzeczami. Możesz dowiedzieć się więcej na ten temat z tego artykułu Toptal.
Sama wiadomość zostanie zapisana w kolejce w formacie JSON. Za każdym razem, gdy zdarzenie jest publikowane w kolejce, wiadomość zostanie zserializowana do formatu JSON. A podczas konsumowania z kolejki wiadomość zostanie zdeserializowana.
Po zdefiniowaniu wiadomości musimy zdefiniować samą kolejkę. W Spring Integration można to łatwo zrobić za pomocą konfiguracji .xml . Konfigurację należy umieścić w katalogu resources/WEB-INF .
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>W konfiguracji możesz zobaczyć część „int-redis:queue-outbound-channel-adapter”. Jego właściwości to:
- id: nazwa ziarna komponentu.
- kanał:
MessageChannel, z którego ten punkt końcowy odbiera komunikaty. - connection-factory: odwołanie do ziarna
RedisConnectionFactory. - kolejka: nazwa listy Redis, na której wykonywana jest operacja wypychania oparta na kolejce w celu wysłania komunikatów Redis. Ten atrybut wzajemnie się wyklucza z wyrażeniem kolejki.
- wyrażenie kolejki: wyrażenie SpEL określające nazwę listy Redis przy użyciu wiadomości przychodzącej w czasie wykonywania jako zmiennej
#root. Ten atrybut wzajemnie się wyklucza z kolejką. - serializator: odwołanie do ziarna
RedisSerializer. Domyślnie jest toJdkSerializationRedisSerializer. Jednak w przypadku ładunkówStringStringRedisSerializerjest używany, jeśli nie podano odwołania do serializatora. - extract-payload: Określ, czy ten punkt końcowy powinien wysyłać tylko ładunek do kolejki Redis, czy całą wiadomość. Jego domyślną wartością jest
true. - left-push: Określ, czy ten punkt końcowy powinien używać lewego wypychania (gdy
true) czy prawego wypychania (gdyfalse) do zapisywania komunikatów na liście Redis. Jeśli ma wartość true, lista Redis działa jako kolejka FIFO, gdy jest używana z domyślnym adapterem kanału przychodzącego kolejki Redis. Ustaw nafalse, aby używać oprogramowania, które odczytuje z listy z lewym wyskakującym okienkiem lub aby uzyskać kolejność wiadomości podobną do stosu. Jego domyślną wartością jesttrue.
Kolejnym krokiem jest zdefiniowanie bramy, o której mowa w konfiguracji .xml . W przypadku bramy używamy klasy RedisChannelGateway z pakietu org.toptal.queue .

StringRedisSerializer służy do serializacji wiadomości przed zapisaniem w Redis. Również w konfiguracji .xml zdefiniowaliśmy bramę i ustawiliśmy RedisChannelGateway jako usługę bramy. Oznacza to, że fasola RedisChannelGateway może być wstrzykiwana do innych ziaren. Zdefiniowaliśmy właściwość default-request-channel , ponieważ możliwe jest również zapewnienie odwołań do kanałów według metody za pomocą adnotacji @Gateway . Definicja klasy:
public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); } Aby podłączyć tę konfigurację do naszej aplikacji, musimy ją zaimportować. Jest to zaimplementowane w klasie SpringIntegrationConfig .
@ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { } Adnotacja @ImportResource służy do importowania plików konfiguracyjnych Spring .xml do @Configuration . Adnotacja @AutoConfigureAfter służy do wskazania, że automatyczna konfiguracja powinna zostać zastosowana po innych określonych klasach automatycznej konfiguracji.
Utworzymy teraz usługę i zaimplementujemy metodę, która będzie umieszczać zdarzenia w kolejce enqueue .
public interface QueueService { void enqueue(PostPublishedEvent event); } @Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } } A teraz możesz łatwo wysłać wiadomość do kolejki za pomocą metody enqueue z QueueService .
Kolejki Redis to po prostu listy z jednym lub kilkoma producentami i konsumentami. Aby opublikować wiadomość do kolejki, producenci używają komendy LPUSH Redis. A jeśli monitorujesz Redis (podpowiedź: wpisz redis-cli monitor ), możesz zobaczyć, że wiadomość została dodana do kolejki:
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"Teraz musimy stworzyć aplikację konsumencką, która wyciągnie te zdarzenia z kolejki i przetworzy je. Usługa konsumencka wymaga tych samych zależności, co usługa producenta.
Teraz możemy ponownie użyć klasy PostPublishedEvent do deserializacji wiadomości.
Musimy stworzyć konfigurację kolejki i ponownie umieścić ją w katalogu resources/WEB-INF . Zawartość konfiguracji kolejki to:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans> W konfiguracji .xml int-redis:queue-inbound-channel-adapter może mieć następujące właściwości:
- id: nazwa ziarna komponentu.
- kanał:
MessageChannel, do którego wysyłamy wiadomości z tego punktu końcowego. - automatyczne uruchamianie: Atrybut
SmartLifecycleokreślający, czy ten punkt końcowy powinien być uruchamiany automatycznie po uruchomieniu kontekstu aplikacji, czy nie. Jego domyślną wartością jesttrue. - faza: Atrybut
SmartLifecycleokreślający fazę, w której ten punkt końcowy zostanie uruchomiony. Jego wartość domyślna to0. - connection-factory: odwołanie do ziarna
RedisConnectionFactory. - kolejka: nazwa listy Redis, na której wykonywana jest operacja wyskakująca oparta na kolejce w celu pobrania komunikatów Redis.
- error-channel:
MessageChannel, do którego wyślemyErrorMessageszExceptionsod zadania nasłuchiwaniaEndpoint. Domyślnie bazowyMessagePublishingErrorHandlerużywa domyślnegoerrorChannelz kontekstu aplikacji. - serializator: odwołanie do ziarna
RedisSerializer. Może to być pusty ciąg, co oznacza brak serializatora. W takim przypadku nieprzetworzonybyte[]z przychodzącego komunikatu Redis jest wysyłany do kanału jako ładunekMessage. Domyślnie jest toJdkSerializationRedisSerializer. - limit czasu odbioru: limit czasu (w milisekundach) oczekiwania operacji pop na komunikat Redis z kolejki. Jego domyślna wartość to 1 sekunda.
- interwał odzyskiwania: czas w milisekundach, przez który zadanie nasłuchiwania powinno uśpić się po wyjątkach operacji pop przed ponownym uruchomieniem zadania nasłuchiwania.
- oczekiwania-wiadomość: Określ, czy ten punkt końcowy oczekuje, że dane z kolejki Redis będą zawierać całe komunikaty. Jeśli ten atrybut jest ustawiony na
true, serializator nie może być pustym ciągiem, ponieważ komunikaty wymagają jakiejś formy deserializacji (domyślnie serializacji JDK). Jego domyślną wartością jestfalse. - task-executor: Odwołanie do komponentu Spring
TaskExecutor(lub standardowego JDK 1.5+ Executor). Służy do podstawowego zadania słuchania. Domyślnie używany jestSimpleAsyncTaskExecutor. - right-pop: Określ, czy ten punkt końcowy powinien używać prawego pop (gdy
true) czy lewego pop (gdyfalse) do odczytywania komunikatów z listy Redis. Jeślitrue, lista Redis działa jako kolejka FIFO, gdy jest używana z domyślnym adapterem kanału wychodzącego kolejki Redis. Ustaw nafalse, aby używać go z oprogramowaniem, które zapisuje na liście prawym przyciskiem myszy lub w celu uzyskania kolejności wiadomości podobnej do stosu. Jego domyślną wartością jesttrue.
Ważną częścią jest „aktywator usługi”, który określa, jaką usługę i metodę należy zastosować do przetworzenia zdarzenia”.
Ponadto json-to-object-transformer wymaga atrybutu type w celu przekształcenia JSON w obiekty, ustawione powyżej na type="com.toptal.integration.spring.model.PostPublishedEvent" .
Ponownie, aby połączyć tę konfigurację, będziemy potrzebować klasy SpringIntegrationConfig , która może być taka sama jak poprzednio. I na koniec potrzebujemy usługi, która faktycznie obsłuży wydarzenie.
public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }Po uruchomieniu aplikacji możesz zobaczyć w Redis:
"BRPOP" "my-event-queue" "1"Wniosek
Dzięki Spring Integration i Redis budowanie aplikacji mikrousług Spring nie jest tak trudne, jak zwykle. Dzięki niewielkiej konfiguracji i niewielkiej ilości standardowego kodu możesz w mgnieniu oka zbudować podstawy architektury mikroserwisów.
Nawet jeśli nie planujesz całkowicie zarysować swojego obecnego projektu Spring i przejść na nową architekturę, z pomocą Redis, bardzo łatwo jest uzyskać ogromną poprawę wydajności dzięki kolejkom.
