Komunikacja mikroserwisowa: samouczek integracji Spring z Redis

Opublikowany: 2022-03-11

Architektura mikroserwisowa to bardzo popularne podejście w projektowaniu i wdrażaniu wysoce skalowalnych aplikacji internetowych. Komunikacja w ramach aplikacji monolitycznej między komponentami jest zwykle oparta na wywołaniach metod lub funkcji w ramach tego samego procesu. Z drugiej strony aplikacja oparta na mikrousługach to rozproszony system działający na wielu komputerach.

Komunikacja między tymi mikrousługami jest ważna, aby mieć stabilny i skalowalny system. Można to zrobić na wiele sposobów. Komunikacja oparta na wiadomościach jest jednym ze sposobów, aby to zrobić niezawodnie.

Podczas korzystania z komunikatów komponenty współdziałają ze sobą poprzez asynchroniczną wymianę komunikatów. Wiadomości są wymieniane kanałami.

graficzna reprezentacja systemu wiadomości ułatwiającego komunikację między usługą A a usługą B

Gdy Usługa A chce komunikować się z Usługą B, zamiast wysyłać ją bezpośrednio, A wysyła ją do określonego kanału. Gdy Usługa B chce przeczytać wiadomość, odbiera ją z określonego kanału wiadomości.

W tym samouczku Spring Integration dowiesz się, jak zaimplementować wiadomości w aplikacji Spring przy użyciu Redis. Zostaniesz poprowadzony przez przykładową aplikację, w której jedna usługa wypycha zdarzenia w kolejce, a inna usługa przetwarza te zdarzenia jeden po drugim.

Integracja wiosenna

Projekt Spring Integration rozszerza platformę Spring, zapewniając obsługę przesyłania wiadomości między aplikacjami opartymi na Spring lub w ich obrębie. Komponenty są połączone ze sobą za pomocą paradygmatu przesyłania wiadomości. Poszczególne komponenty mogą nie wiedzieć o innych komponentach w aplikacji.

Spring Integration zapewnia szeroki wybór mechanizmów komunikacji z systemami zewnętrznymi. Adaptery kanałów są jednym z takich mechanizmów stosowanych do integracji jednokierunkowej (wysyłanie lub odbieranie). Bramy są używane w scenariuszach żądań/odpowiedzi (przychodzących lub wychodzących).

Apache Camel to szeroko stosowana alternatywa. Integracja Spring jest zwykle preferowana w istniejących usługach opartych na Spring, ponieważ jest częścią ekosystemu Spring.

Redis

Redis to niezwykle szybki magazyn danych w pamięci. Opcjonalnie może również trwać na dysku. Obsługuje różne struktury danych, takie jak proste pary klucz-wartość, zestawy, kolejki itp.

Używanie Redis jako kolejki znacznie ułatwia udostępnianie danych między komponentami i skalowanie w poziomie. Producent lub wielu producentów może wypchnąć dane do kolejki, a konsument lub wielu konsumentów może pobrać dane i przetworzyć zdarzenie.

Wielu konsumentów nie może korzystać z tego samego zdarzenia — dzięki temu jedno zdarzenie jest przetwarzane raz.

schemat przedstawiający architekturę producenta/konsumenta

Korzyści z używania Redis jako kolejki wiadomości:

  • Równoległe wykonywanie dyskretnych zadań w sposób nieblokujący
  • Świetny występ
  • Stabilność
  • Łatwe monitorowanie i debugowanie
  • Łatwa implementacja i użytkowanie

Zasady:

  • Dodanie zadania do kolejki powinno być szybsze niż przetwarzanie samego zadania.
  • Konsumpcja zadań powinna być szybsza niż ich produkcja (a jeśli nie, dodaj więcej konsumentów).

Wiosenna integracja z Redis

Poniżej przedstawiamy tworzenie przykładowej aplikacji, aby wyjaśnić, jak korzystać ze Spring Integration z Redis.

Załóżmy, że masz aplikację, która umożliwia użytkownikom publikowanie postów. I chcesz zbudować funkcję śledzenia. Kolejnym wymogiem jest to, aby za każdym razem, gdy ktoś publikuje post, wszyscy obserwujący powinni być powiadamiani za pośrednictwem jakiegoś kanału komunikacji (np. e-mail lub powiadomienie push).

Jednym ze sposobów na zaimplementowanie tego jest wysłanie wiadomości e-mail do każdego obserwującego, gdy użytkownik coś opublikuje. Ale co się dzieje, gdy użytkownik ma 1000 obserwujących? A kiedy 1000 użytkowników opublikuje coś w 10 sekund, z których każdy ma 1000 obserwujących? Czy post wydawcy będzie czekać na wysłanie wszystkich e-maili?

Systemy rozproszone rozwiązują ten problem.

Ten konkretny problem można rozwiązać za pomocą kolejki. Serwis A (producent), który jest odpowiedzialny za publikowanie postów, właśnie to zrobi. Opublikuje post i wypchnie wydarzenie z listą użytkowników, którzy muszą otrzymać e-mail i sam post. Listę użytkowników można by pobrać w usłudze B, ale dla uproszczenia tego przykładu wyślemy ją z usługi A.

To jest operacja asynchroniczna. Oznacza to, że publikująca usługa nie będzie musiała czekać na wysłanie wiadomości e-mail.

Usługa B (konsument) wyciągnie zdarzenie z kolejki i przetworzy je. W ten sposób moglibyśmy łatwo skalować nasze usługi i moglibyśmy mieć n konsumentów wysyłających e-maile (zdarzenia przetwarzania).

Zacznijmy więc od wdrożenia w serwisie producenta. Niezbędne zależności to:

 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>

Te trzy zależności Mavena są niezbędne:

  • Jedis jest klientem Redis.
  • Zależność Spring Data Redis ułatwia korzystanie z Redis w Javie. Zapewnia znajome koncepcje Spring, takie jak klasa szablonów dla podstawowego użycia interfejsu API i lekki dostęp do danych w stylu repozytorium.
  • Spring Integration Redis zapewnia rozszerzenie modelu programowania Spring o obsługę dobrze znanych wzorców integracji przedsiębiorstw.

Następnie musimy skonfigurować klienta Jedis:

 @Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }

Adnotacja @Value oznacza, że ​​Spring wprowadzi do pola wartość zdefiniowaną we właściwościach aplikacji. Oznacza to, że wartości redis.host i redis.port powinny być zdefiniowane we właściwościach aplikacji.

Teraz musimy zdefiniować wiadomość, którą chcemy wysłać do kolejki. Prosta przykładowa wiadomość może wyglądać tak:

 @Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }

Uwaga: Project Lombok (https://projectlombok.org/) udostępnia @Getter , @Setter , @Builder , a także wiele innych adnotacji, aby uniknąć zaśmiecania kodu programami pobierającymi, ustawiającymi i innymi trywialnymi rzeczami. Możesz dowiedzieć się więcej na ten temat z tego artykułu Toptal.

Sama wiadomość zostanie zapisana w kolejce w formacie JSON. Za każdym razem, gdy zdarzenie jest publikowane w kolejce, wiadomość zostanie zserializowana do formatu JSON. A podczas konsumowania z kolejki wiadomość zostanie zdeserializowana.

Po zdefiniowaniu wiadomości musimy zdefiniować samą kolejkę. W Spring Integration można to łatwo zrobić za pomocą konfiguracji .xml . Konfigurację należy umieścić w katalogu resources/WEB-INF .

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>

W konfiguracji możesz zobaczyć część „int-redis:queue-outbound-channel-adapter”. Jego właściwości to:

  • id: nazwa ziarna komponentu.
  • kanał: MessageChannel , z którego ten punkt końcowy odbiera komunikaty.
  • connection-factory: odwołanie do ziarna RedisConnectionFactory .
  • kolejka: nazwa listy Redis, na której wykonywana jest operacja wypychania oparta na kolejce w celu wysłania komunikatów Redis. Ten atrybut wzajemnie się wyklucza z wyrażeniem kolejki.
  • wyrażenie kolejki: wyrażenie SpEL określające nazwę listy Redis przy użyciu wiadomości przychodzącej w czasie wykonywania jako zmiennej #root . Ten atrybut wzajemnie się wyklucza z kolejką.
  • serializator: odwołanie do ziarna RedisSerializer . Domyślnie jest to JdkSerializationRedisSerializer . Jednak w przypadku ładunków String StringRedisSerializer jest używany, jeśli nie podano odwołania do serializatora.
  • extract-payload: Określ, czy ten punkt końcowy powinien wysyłać tylko ładunek do kolejki Redis, czy całą wiadomość. Jego domyślną wartością jest true .
  • left-push: Określ, czy ten punkt końcowy powinien używać lewego wypychania (gdy true ) czy prawego wypychania (gdy false ) do zapisywania komunikatów na liście Redis. Jeśli ma wartość true, lista Redis działa jako kolejka FIFO, gdy jest używana z domyślnym adapterem kanału przychodzącego kolejki Redis. Ustaw na false , aby używać oprogramowania, które odczytuje z listy z lewym wyskakującym okienkiem lub aby uzyskać kolejność wiadomości podobną do stosu. Jego domyślną wartością jest true .

Kolejnym krokiem jest zdefiniowanie bramy, o której mowa w konfiguracji .xml . W przypadku bramy używamy klasy RedisChannelGateway z pakietu org.toptal.queue .

StringRedisSerializer służy do serializacji wiadomości przed zapisaniem w Redis. Również w konfiguracji .xml zdefiniowaliśmy bramę i ustawiliśmy RedisChannelGateway jako usługę bramy. Oznacza to, że fasola RedisChannelGateway może być wstrzykiwana do innych ziaren. Zdefiniowaliśmy właściwość default-request-channel , ponieważ możliwe jest również zapewnienie odwołań do kanałów według metody za pomocą adnotacji @Gateway . Definicja klasy:

 public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }

Aby podłączyć tę konfigurację do naszej aplikacji, musimy ją zaimportować. Jest to zaimplementowane w klasie SpringIntegrationConfig .

 @ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }

Adnotacja @ImportResource służy do importowania plików konfiguracyjnych Spring .xml do @Configuration . Adnotacja @AutoConfigureAfter służy do wskazania, że ​​automatyczna konfiguracja powinna zostać zastosowana po innych określonych klasach automatycznej konfiguracji.

Utworzymy teraz usługę i zaimplementujemy metodę, która będzie umieszczać zdarzenia w kolejce enqueue .

 public interface QueueService { void enqueue(PostPublishedEvent event); }
 @Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }

A teraz możesz łatwo wysłać wiadomość do kolejki za pomocą metody enqueue z QueueService .

Kolejki Redis to po prostu listy z jednym lub kilkoma producentami i konsumentami. Aby opublikować wiadomość do kolejki, producenci używają komendy LPUSH Redis. A jeśli monitorujesz Redis (podpowiedź: wpisz redis-cli monitor ), możesz zobaczyć, że wiadomość została dodana do kolejki:

 "LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"

Teraz musimy stworzyć aplikację konsumencką, która wyciągnie te zdarzenia z kolejki i przetworzy je. Usługa konsumencka wymaga tych samych zależności, co usługa producenta.

Teraz możemy ponownie użyć klasy PostPublishedEvent do deserializacji wiadomości.

Musimy stworzyć konfigurację kolejki i ponownie umieścić ją w katalogu resources/WEB-INF . Zawartość konfiguracji kolejki to:

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>

W konfiguracji .xml int-redis:queue-inbound-channel-adapter może mieć następujące właściwości:

  • id: nazwa ziarna komponentu.
  • kanał: MessageChannel , do którego wysyłamy wiadomości z tego punktu końcowego.
  • automatyczne uruchamianie: Atrybut SmartLifecycle określający, czy ten punkt końcowy powinien być uruchamiany automatycznie po uruchomieniu kontekstu aplikacji, czy nie. Jego domyślną wartością jest true .
  • faza: Atrybut SmartLifecycle określający fazę, w której ten punkt końcowy zostanie uruchomiony. Jego wartość domyślna to 0 .
  • connection-factory: odwołanie do ziarna RedisConnectionFactory .
  • kolejka: nazwa listy Redis, na której wykonywana jest operacja wyskakująca oparta na kolejce w celu pobrania komunikatów Redis.
  • error-channel: MessageChannel , do którego wyślemy ErrorMessages z Exceptions od zadania nasłuchiwania Endpoint . Domyślnie bazowy MessagePublishingErrorHandler używa domyślnego errorChannel z kontekstu aplikacji.
  • serializator: odwołanie do ziarna RedisSerializer . Może to być pusty ciąg, co oznacza brak serializatora. W takim przypadku nieprzetworzony byte[] z przychodzącego komunikatu Redis jest wysyłany do kanału jako ładunek Message . Domyślnie jest to JdkSerializationRedisSerializer .
  • limit czasu odbioru: limit czasu (w milisekundach) oczekiwania operacji pop na komunikat Redis z kolejki. Jego domyślna wartość to 1 sekunda.
  • interwał odzyskiwania: czas w milisekundach, przez który zadanie nasłuchiwania powinno uśpić się po wyjątkach operacji pop przed ponownym uruchomieniem zadania nasłuchiwania.
  • oczekiwania-wiadomość: Określ, czy ten punkt końcowy oczekuje, że dane z kolejki Redis będą zawierać całe komunikaty. Jeśli ten atrybut jest ustawiony na true , serializator nie może być pustym ciągiem, ponieważ komunikaty wymagają jakiejś formy deserializacji (domyślnie serializacji JDK). Jego domyślną wartością jest false .
  • task-executor: Odwołanie do komponentu Spring TaskExecutor (lub standardowego JDK 1.5+ Executor). Służy do podstawowego zadania słuchania. Domyślnie używany jest SimpleAsyncTaskExecutor .
  • right-pop: Określ, czy ten punkt końcowy powinien używać prawego pop (gdy true ) czy lewego pop (gdy false ) do odczytywania komunikatów z listy Redis. Jeśli true , lista Redis działa jako kolejka FIFO, gdy jest używana z domyślnym adapterem kanału wychodzącego kolejki Redis. Ustaw na false , aby używać go z oprogramowaniem, które zapisuje na liście prawym przyciskiem myszy lub w celu uzyskania kolejności wiadomości podobnej do stosu. Jego domyślną wartością jest true .

Ważną częścią jest „aktywator usługi”, który określa, jaką usługę i metodę należy zastosować do przetworzenia zdarzenia”.

Ponadto json-to-object-transformer wymaga atrybutu type w celu przekształcenia JSON w obiekty, ustawione powyżej na type="com.toptal.integration.spring.model.PostPublishedEvent" .

Ponownie, aby połączyć tę konfigurację, będziemy potrzebować klasy SpringIntegrationConfig , która może być taka sama jak poprzednio. I na koniec potrzebujemy usługi, która faktycznie obsłuży wydarzenie.

 public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }

Po uruchomieniu aplikacji możesz zobaczyć w Redis:

 "BRPOP" "my-event-queue" "1"

Wniosek

Dzięki Spring Integration i Redis budowanie aplikacji mikrousług Spring nie jest tak trudne, jak zwykle. Dzięki niewielkiej konfiguracji i niewielkiej ilości standardowego kodu możesz w mgnieniu oka zbudować podstawy architektury mikroserwisów.

Nawet jeśli nie planujesz całkowicie zarysować swojego obecnego projektu Spring i przejść na nową architekturę, z pomocą Redis, bardzo łatwo jest uzyskać ogromną poprawę wydajności dzięki kolejkom.