Comunicarea cu microservicii: un tutorial de integrare Spring cu Redis

Publicat: 2022-03-11

Arhitectura de microservicii este o abordare foarte populară în proiectarea și implementarea aplicațiilor web foarte scalabile. Comunicarea într-o aplicație monolitică între componente se bazează de obicei pe apeluri de metodă sau de funcție în cadrul aceluiași proces. O aplicație bazată pe microservicii, pe de altă parte, este un sistem distribuit care rulează pe mai multe mașini.

Comunicarea între aceste microservicii este importantă pentru a avea un sistem stabil și scalabil. Există mai multe moduri de a face acest lucru. Comunicarea bazată pe mesaje este o modalitate de a face acest lucru în mod fiabil.

Când se utilizează mesageria, componentele interacționează între ele prin schimbul asincron de mesaje. Mesajele sunt schimbate prin canale.

reprezentare grafică a unui sistem de mesagerie care facilitează comunicarea între serviciul A și serviciul B

Când Serviciul A dorește să comunice cu Serviciul B, în loc să îl trimită direct, A îl trimite către un anumit canal. Când Serviciul B dorește să citească mesajul, preia mesajul de la un anumit canal de mesaje.

În acest tutorial de integrare Spring, veți învăța cum să implementați mesageria într-o aplicație Spring folosind Redis. Veți fi parcurs printr-un exemplu de aplicație în care un serviciu împinge evenimente în coadă și un alt serviciu procesează aceste evenimente unul câte unul.

Integrare de primăvară

Proiectul Spring Integration extinde cadrul Spring pentru a oferi suport pentru mesageria între sau în cadrul aplicațiilor bazate pe Spring. Componentele sunt conectate împreună prin paradigma de mesagerie. Este posibil ca componentele individuale să nu fie conștiente de alte componente din aplicație.

Spring Integration oferă o selecție largă de mecanisme pentru a comunica cu sistemele externe. Adaptoarele de canal sunt un astfel de mecanism utilizat pentru integrarea unidirecțională (trimitere sau primire). Și gateway-urile sunt folosite pentru scenarii de solicitare/răspuns (inbound sau outbound).

Apache Camel este o alternativă utilizată pe scară largă. Integrarea Spring este de obicei preferată în serviciile existente bazate pe Spring, deoarece face parte din ecosistemul Spring.

Redis

Redis este un depozit de date extrem de rapid în memorie. Opțional, poate persista și pe un disc. Acceptă diferite structuri de date, cum ar fi perechi cheie-valoare simple, seturi, cozi etc.

Utilizarea Redis ca coadă face partajarea datelor între componente și scalarea orizontală mult mai ușoară. Un producător sau mai mulți producători pot împinge datele în coadă, iar un consumator sau mai mulți consumatori pot extrage datele și procesa evenimentul.

Mai mulți consumatori nu pot consuma același eveniment - acest lucru asigură că un eveniment este procesat o dată.

diagramă care prezintă arhitectura producător/consumator

Avantajele utilizării Redis ca coadă de mesaje:

  • Executarea paralelă a sarcinilor discrete într-un mod neblocant
  • Performanță grozavă
  • Stabilitate
  • Monitorizare și depanare ușoară
  • Implementare și utilizare ușoară

Reguli:

  • Adăugarea unei sarcini la coadă ar trebui să fie mai rapidă decât procesarea sarcinii în sine.
  • Consumarea sarcinilor ar trebui să fie mai rapidă decât producerea lor (și dacă nu, adăugați mai mulți consumatori).

Integrare Spring cu Redis

Următoarele parcurge crearea unui exemplu de aplicație pentru a explica cum să utilizați Spring Integration cu Redis.

Să presupunem că aveți o aplicație care permite utilizatorilor să publice postări. Și doriți să construiți o funcție de urmărire. O altă cerință este ca de fiecare dată când cineva publică o postare, toți adepții să fie notificați prin intermediul unui canal de comunicare (de exemplu, e-mail sau notificare push).

O modalitate de a implementa acest lucru este să trimiteți un e-mail fiecărui utilizator odată ce utilizatorul publică ceva. Dar ce se întâmplă când utilizatorul are 1.000 de urmăritori? Și când 1.000 de utilizatori publică ceva în 10 secunde, fiecare dintre ei are 1.000 de urmăritori? De asemenea, postarea editorului va aștepta până când toate e-mailurile vor fi trimise?

Sistemele distribuite rezolvă această problemă.

Această problemă specifică ar putea fi rezolvată folosind o coadă. Serviciul A (producătorul), care este responsabil pentru publicarea postărilor, va face doar asta. Acesta va publica o postare și va împinge un eveniment cu lista de utilizatori care trebuie să primească un e-mail și postarea în sine. Lista utilizatorilor ar putea fi preluată în serviciul B, dar pentru simplitatea acestui exemplu, o vom trimite de la serviciul A.

Aceasta este o operație asincronă. Aceasta înseamnă că serviciul care publică nu va trebui să aștepte pentru a trimite e-mailuri.

Serviciul B (consumatorul) va scoate evenimentul din coadă și îl va procesa. În acest fel, ne-am putea scala cu ușurință serviciile și am putea avea n consumatori care trimit e-mailuri (procesează evenimente).

Deci să începem cu o implementare în serviciul producătorului. Dependentele necesare sunt:

 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>

Aceste trei dependențe Maven sunt necesare:

  • Jedis este un client Redis.
  • Dependența Spring Data Redis facilitează utilizarea Redis în Java. Oferă concepte Spring familiare, cum ar fi o clasă de șablon pentru utilizarea API-ului de bază și acces ușor la date în stil depozit.
  • Spring Integration Redis oferă o extensie a modelului de programare Spring pentru a susține binecunoscutele modele de integrare a întreprinderii.

În continuare, trebuie să configuram clientul Jedis:

 @Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }

Adnotarea @Value înseamnă că Spring va injecta în câmp valoarea definită în proprietățile aplicației. Aceasta înseamnă că valorile redis.host și redis.port ar trebui definite în proprietățile aplicației.

Acum, trebuie să definim mesajul pe care vrem să-l trimitem în coadă. Un exemplu simplu de mesaj ar putea arăta astfel:

 @Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }

Notă: Proiectul Lombok (https://projectlombok.org/) oferă @Getter , @Setter , @Builder și multe alte adnotări pentru a evita aglomerarea codului cu getters, setters și alte chestii banale. Puteți afla mai multe despre aceasta din acest articol Toptal.

Mesajul în sine va fi salvat în format JSON în coadă. De fiecare dată când un eveniment este publicat în coadă, mesajul va fi serializat în JSON. Iar atunci când se consumă din coadă, mesajul va fi deserializat.

Cu mesajul definit, trebuie să definim coada în sine. În Spring Integration, se poate face cu ușurință printr-o configurație .xml . Configurația trebuie plasată în directorul resources/WEB-INF .

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>

În configurație, puteți vedea partea „int-redis:queue-outbound-channel-adapter”. Proprietățile sale sunt:

  • id: Numele bean-ului componentei.
  • canal: MessageChannel de la care acest punct final primește mesaje.
  • connection-factory: O referință la un bean RedisConnectionFactory .
  • coadă: numele listei Redis pe care se efectuează operația push bazată pe coadă pentru a trimite mesaje Redis. Acest atribut se exclude reciproc cu expresia coadă.
  • queue-expression: O expresie SpEL pentru a determina numele listei Redis folosind mesajul primit în timpul execuției ca variabilă #root . Acest atribut se exclude reciproc cu coada.
  • serializator: o referință RedisSerializer bean. În mod implicit, este un JdkSerializationRedisSerializer . Cu toate acestea, pentru încărcăturile utile String , se folosește un StringRedisSerializer dacă nu este furnizată o referință pentru serializator.
  • extract-payload: specificați dacă acest punct final ar trebui să trimită doar sarcina utilă la coada Redis sau întregul mesaj. Valoarea sa implicită este true .
  • left-push: specificați dacă acest punct final ar trebui să folosească apăsarea stângă (când este true ) sau apăsare dreapta (când false ) pentru a scrie mesaje în lista Redis. Dacă este adevărată, lista Redis acționează ca o coadă FIFO atunci când este utilizată cu un adaptor de canal de intrare pentru coadă Redis implicit. Setați la false pentru a fi utilizat cu software care citește din listă cu pop stânga sau pentru a obține o ordine de mesaje asemănătoare stivei. Valoarea sa implicită este true .

Următorul pas este definirea gateway-ului, care este menționat în configurația .xml . Pentru un gateway, folosim clasa RedisChannelGateway din pachetul org.toptal.queue .

StringRedisSerializer este utilizat pentru a serializa mesajul înainte de a salva în Redis. De asemenea, în configurația .xml , am definit gateway-ul și am setat RedisChannelGateway ca serviciu gateway. Aceasta înseamnă că RedisChannelGateway ar putea fi injectat în alte boabe. Am definit proprietatea default-request-channel deoarece este, de asemenea, posibil să furnizăm referințe de canal pentru fiecare metodă folosind adnotarea @Gateway . Definiția clasei:

 public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }

Pentru a conecta această configurație în aplicația noastră, trebuie să o importăm. Acest lucru este implementat în clasa SpringIntegrationConfig .

 @ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }

Adnotarea @ImportResource este utilizată pentru a importa fișierele de configurare Spring .xml în @Configuration . Și adnotarea @AutoConfigureAfter este folosită pentru a sugera că o configurare automată ar trebui să fie aplicată după alte clase de configurare automată specificate.

Acum vom crea un serviciu și vom implementa metoda care va pune în enqueue evenimentele în coada Redis.

 public interface QueueService { void enqueue(PostPublishedEvent event); }
 @Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }

Și acum, puteți trimite cu ușurință un mesaj la coadă utilizând metoda enqueue de la QueueService .

Cozile Redis sunt pur și simplu liste cu unul sau mai mulți producători și consumatori. Pentru a publica un mesaj într-o coadă, producătorii folosesc comanda LPUSH Redis. Și dacă monitorizați Redis (hint: tastați redis-cli monitor ), puteți vedea că mesajul este adăugat în coadă:

 "LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"

Acum, trebuie să creăm o aplicație de consum care va extrage aceste evenimente din coadă și le va procesa. Serviciul pentru consumatori are nevoie de aceleași dependențe ca și serviciul de producător.

Acum putem reutiliza clasa PostPublishedEvent pentru a deserializa mesajele.

Trebuie să creăm configurația cozii și, din nou, trebuie să fie plasată în directorul resources/WEB-INF . Conținutul configurației cozii este:

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>

În configurația .xml , int-redis:queue-inbound-channel-adapter poate avea următoarele proprietăți:

  • id: Numele bean-ului componentei.
  • canal: canalul MessageChannel mesaje către care trimitem mesaje de la acest punct final.
  • auto-startup: un atribut SmartLifecycle pentru a specifica dacă acest punct final ar trebui să pornească automat după pornirea contextului aplicației sau nu. Valoarea sa implicită este true .
  • fază: un atribut SmartLifecycle pentru a specifica faza în care va fi pornit acest punct final. Valoarea sa implicită este 0 .
  • connection-factory: O referință la un bean RedisConnectionFactory .
  • coadă: numele listei Redis pe care se efectuează operația pop bazată pe coadă pentru a primi mesaje Redis.
  • error-channel: MessageChannel către care vom trimite ErrorMessages cu Exceptions de la sarcina de ascultare a Endpoint -ului. În mod implicit, MessagePublishingErrorHandler de bază utilizează errorChannel implicit din contextul aplicației.
  • serializator: referința bean RedisSerializer . Poate fi un șir gol, ceea ce înseamnă că nu există un serializator. În acest caz, byte[] din mesajul Redis de intrare este trimis canalului ca sarcină utilă a Message . În mod implicit, este un JdkSerializationRedisSerializer .
  • receive-timeout: Timeout în milisecunde pentru ca operațiunea pop să aștepte un mesaj Redis din coadă. Valoarea sa implicită este de 1 secundă.
  • intervalul de recuperare: timpul în milisecunde pentru care sarcina de ascultător ar trebui să stea după excepții la operația pop înainte de a reporni sarcina de ascultător.
  • wait-message: Specificați dacă acest punct final se așteaptă ca datele din coada Redis să conțină mesaje întregi. Dacă acest atribut este setat la true , serializatorul nu poate fi un șir gol, deoarece mesajele necesită o formă de deserializare (serializare JDK în mod implicit). Valoarea sa implicită este false .
  • task-executor: o referință la un bean Spring TaskExecutor (sau standard JDK 1.5+ Executor). Este folosit pentru sarcina de ascultare de bază. În mod implicit, este utilizat un SimpleAsyncTaskExecutor .
  • dreapta-pop: specificați dacă acest punct final ar trebui să folosească pop-ul dreapta (când este true ) sau pop-ul stânga (când false ) pentru a citi mesajele din lista Redis. Dacă este true , lista Redis acționează ca o coadă FIFO atunci când este utilizată cu un adaptor de canal de ieșire pentru coadă Redis implicit. Setați la false pentru a fi utilizat cu software care scrie în listă prin apăsare dreapta sau pentru a obține o ordine de mesaje asemănătoare stivei. Valoarea sa implicită este true .

Partea importantă este „activatorul de servicii”, care definește ce serviciu și metodă ar trebui utilizate pentru a procesa evenimentul.'

De asemenea, json-to-object-transformer are nevoie de un atribut de tip pentru a transforma JSON în obiecte, setat mai sus la type="com.toptal.integration.spring.model.PostPublishedEvent" .

Din nou, pentru a conecta această configurație, vom avea nevoie de clasa SpringIntegrationConfig , care poate fi aceeași ca înainte. Și, în sfârșit, avem nevoie de un serviciu care va procesa efectiv evenimentul.

 public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }

Odată ce rulați aplicația, puteți vedea în Redis:

 "BRPOP" "my-event-queue" "1"

Concluzie

Cu Spring Integration și Redis, construirea unei aplicații de microservicii Spring nu este atât de descurajantă pe cât ar fi în mod normal. Cu o configurație mică și o cantitate mică de cod standard, puteți construi bazele arhitecturii dvs. de microservicii în cel mai scurt timp.

Chiar dacă nu intenționați să vă zgâriați în întregime actualul proiect Spring și să treceți la o nouă arhitectură, cu ajutorul Redis, este foarte simplu să obțineți îmbunătățiri uriașe de performanță cu cozi.