Microservice-Kommunikation: Ein Spring-Integrations-Tutorial mit Redis

Veröffentlicht: 2022-03-11

Die Microservice-Architektur ist ein sehr beliebter Ansatz beim Entwerfen und Implementieren hochskalierbarer Webanwendungen. Die Kommunikation innerhalb einer monolithischen Anwendung zwischen Komponenten basiert normalerweise auf Methoden- oder Funktionsaufrufen innerhalb desselben Prozesses. Eine auf Microservices basierende Anwendung hingegen ist ein verteiltes System, das auf mehreren Computern ausgeführt wird.

Die Kommunikation zwischen diesen Microservices ist wichtig, um ein stabiles und skalierbares System zu haben. Es gibt mehrere Möglichkeiten, dies zu tun. Nachrichtenbasierte Kommunikation ist eine Möglichkeit, dies zuverlässig zu tun.

Bei der Verwendung von Messaging interagieren Komponenten miteinander, indem sie asynchron Nachrichten austauschen. Nachrichten werden über Kanäle ausgetauscht.

Grafische Darstellung eines Messaging-Systems, das die Kommunikation zwischen Dienst A und Dienst B erleichtert

Wenn Dienst A mit Dienst B kommunizieren möchte, sendet A ihn nicht direkt, sondern an einen bestimmten Kanal. Wenn Dienst B die Nachricht lesen möchte, holt er die Nachricht von einem bestimmten Nachrichtenkanal ab.

In diesem Tutorial zur Spring-Integration erfahren Sie, wie Sie Messaging in einer Spring-Anwendung mit Redis implementieren. Sie werden durch eine Beispielanwendung geführt, in der ein Dienst Ereignisse in die Warteschlange schiebt und ein anderer Dienst diese Ereignisse nacheinander verarbeitet.

Frühlingsintegration

Das Projekt Spring Integration erweitert das Spring-Framework, um Unterstützung für das Messaging zwischen oder innerhalb von Spring-basierten Anwendungen bereitzustellen. Komponenten werden über das Messaging-Paradigma miteinander verbunden. Einzelne Komponenten kennen möglicherweise andere Komponenten in der Anwendung nicht.

Spring Integration bietet eine große Auswahl an Mechanismen zur Kommunikation mit externen Systemen. Kanaladapter sind ein solcher Mechanismus, der für die unidirektionale Integration (Senden oder Empfangen) verwendet wird. Und Gateways werden für Anforderungs-/Antwortszenarien (eingehend oder ausgehend) verwendet.

Apache Camel ist eine weit verbreitete Alternative. Die Spring-Integration wird normalerweise in bestehenden Spring-basierten Diensten bevorzugt, da sie Teil des Spring-Ökosystems ist.

Redis

Redis ist ein extrem schneller In-Memory-Datenspeicher. Es kann optional auch auf einem Datenträger bestehen bleiben. Es unterstützt verschiedene Datenstrukturen wie einfache Schlüssel-Wert-Paare, Sätze, Warteschlangen usw.

Die Verwendung von Redis als Warteschlange erleichtert die gemeinsame Nutzung von Daten zwischen Komponenten und die horizontale Skalierung erheblich. Ein Producer oder mehrere Producer können Daten in die Warteschlange verschieben, und ein Consumer oder mehrere Consumer können die Daten abrufen und das Ereignis verarbeiten.

Mehrere Verbraucher können nicht dasselbe Ereignis konsumieren – dies stellt sicher, dass ein Ereignis einmal verarbeitet wird.

Diagramm, das die Producer/Consumer-Architektur zeigt

Vorteile der Verwendung von Redis als Nachrichtenwarteschlange:

  • Parallele Ausführung diskreter Aufgaben auf nicht blockierende Weise
  • Gute Leistung
  • Stabilität
  • Einfache Überwachung und Fehlersuche
  • Einfache Implementierung und Nutzung

Regeln:

  • Das Hinzufügen einer Aufgabe zur Warteschlange sollte schneller sein als die Verarbeitung der Aufgabe selbst.
  • Das Konsumieren von Aufgaben sollte schneller sein als das Produzieren (und falls nicht, fügen Sie weitere Konsumenten hinzu).

Spring-Integration mit Redis

Im Folgenden wird die Erstellung einer Beispielanwendung erläutert, um die Verwendung von Spring Integration mit Redis zu erläutern.

Angenommen, Sie haben eine Anwendung, mit der Benutzer Beiträge veröffentlichen können. Und Sie möchten eine Follow-Funktion erstellen. Eine weitere Anforderung ist, dass jedes Mal, wenn jemand einen Beitrag veröffentlicht, alle Follower über einen Kommunikationskanal (z. B. E-Mail oder Push-Benachrichtigung) benachrichtigt werden sollten.

Eine Möglichkeit, dies zu implementieren, besteht darin, jedem Follower eine E-Mail zu senden, sobald der Benutzer etwas veröffentlicht. Aber was passiert, wenn der Benutzer 1.000 Follower hat? Und wenn 1.000 User in 10 Sekunden etwas veröffentlichen, hat jeder von ihnen 1.000 Follower? Wird der Post des Herausgebers warten, bis alle E-Mails gesendet wurden?

Verteilte Systeme lösen dieses Problem.

Dieses spezielle Problem könnte durch die Verwendung einer Warteschlange gelöst werden. Service A (der Produzent), der für das Veröffentlichen von Posts verantwortlich ist, wird genau das tun. Es veröffentlicht einen Post und pusht ein Ereignis mit der Liste der Benutzer, die eine E-Mail und den Post selbst erhalten müssen. Die Liste der Benutzer könnte in Dienst B abgerufen werden, aber der Einfachheit halber senden wir sie in diesem Beispiel von Dienst A.

Dies ist ein asynchroner Vorgang. Das bedeutet, dass der veröffentlichende Dienst nicht warten muss, um E-Mails zu senden.

Dienst B (der Konsument) zieht das Ereignis aus der Warteschlange und verarbeitet es. Auf diese Weise könnten wir unsere Dienste einfach skalieren, und wir könnten n Verbraucher haben, die E-Mails senden (Verarbeitungsereignisse).

Beginnen wir also mit einer Implementierung im Dienst des Produzenten. Notwendige Abhängigkeiten sind:

 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>

Diese drei Maven-Abhängigkeiten sind notwendig:

  • Jedis ist ein Redis-Client.
  • Die Abhängigkeit von Spring Data Redis erleichtert die Verwendung von Redis in Java. Es bietet vertraute Spring-Konzepte wie eine Vorlagenklasse für die Kern-API-Nutzung und einen einfachen Datenzugriff im Repository-Stil.
  • Spring Integration Redis bietet eine Erweiterung des Spring-Programmiermodells zur Unterstützung der bekannten Enterprise Integration Patterns.

Als nächstes müssen wir den Jedis-Client konfigurieren:

 @Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }

Die Annotation @Value bedeutet, dass Spring den in den Anwendungseigenschaften definierten Wert in das Feld einfügt. Das bedeutet, dass redis.host und redis.port Werte in den Anwendungseigenschaften definiert werden sollten.

Jetzt müssen wir die Nachricht definieren, die wir an die Warteschlange senden möchten. Eine einfache Beispielnachricht könnte so aussehen:

 @Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }

Hinweis: Project Lombok (https://projectlombok.org/) stellt @Getter , @Setter , @Builder und viele andere Anmerkungen bereit, um zu vermeiden, dass der Code mit Gettern, Settern und anderen trivialen Dingen überladen wird. In diesem Toptal-Artikel erfahren Sie mehr darüber.

Die Nachricht selbst wird im JSON-Format in der Warteschlange gespeichert. Jedes Mal, wenn ein Ereignis in der Warteschlange veröffentlicht wird, wird die Nachricht in JSON serialisiert. Und beim Konsumieren aus der Warteschlange wird die Nachricht deserialisiert.

Nachdem die Nachricht definiert ist, müssen wir die Warteschlange selbst definieren. In Spring Integration kann dies einfach über eine .xml -Konfiguration erfolgen. Die Konfiguration sollte im Verzeichnis resources/WEB-INF abgelegt werden.

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>

In der Konfiguration sehen Sie den Teil „int-redis:queue-outbound-channel-adapter“. Seine Eigenschaften sind:

  • id: Der Bean-Name der Komponente.
  • channel: MessageChannel , von dem dieser Endpunkt Nachrichten empfängt.
  • connection-factory: Ein Verweis auf eine RedisConnectionFactory -Bean.
  • queue: Der Name der Redis-Liste, auf der der warteschlangenbasierte Push-Vorgang ausgeführt wird, um Redis-Nachrichten zu senden. Dieses Attribut schließt sich gegenseitig mit queue-expression aus.
  • queue-expression: Ein SpEL-Ausdruck zum Ermitteln des Namens der Redis-Liste unter Verwendung der eingehenden Nachricht zur Laufzeit als #root Variable. Dieses Attribut schließt sich gegenseitig mit der Warteschlange aus.
  • serializer: Eine RedisSerializer -Bean-Referenz. Standardmäßig ist es ein JdkSerializationRedisSerializer . Für String -Payloads wird jedoch ein StringRedisSerializer verwendet, wenn keine Serializer-Referenz bereitgestellt wird.
  • Extract-Payload: Geben Sie an, ob dieser Endpunkt nur die Payload an die Redis-Warteschlange oder die gesamte Nachricht senden soll. Sein Standardwert ist true .
  • left-push: Geben Sie an, ob dieser Endpunkt Push nach links (wenn true ) oder Push nach rechts (wenn false ) verwenden soll, um Nachrichten in die Redis-Liste zu schreiben. Bei „true“ fungiert die Redis-Liste als FIFO-Warteschlange, wenn sie mit einem standardmäßigen Eingangskanaladapter für Redis-Warteschlangen verwendet wird. Auf „ false setzen, um mit Software verwendet zu werden, die mit linkem Pop aus der Liste liest, oder um eine stapelartige Nachrichtenreihenfolge zu erreichen. Sein Standardwert ist true .

Der nächste Schritt besteht darin, das Gateway zu definieren, das in der .xml Konfiguration erwähnt wird. Für ein Gateway verwenden wir die Klasse RedisChannelGateway aus dem Paket org.toptal.queue .

StringRedisSerializer wird zum Serialisieren von Nachrichten vor dem Speichern in Redis verwendet. Auch in der .xml -Konfiguration haben wir das Gateway definiert und RedisChannelGateway als Gateway-Dienst festgelegt. Dies bedeutet, dass die RedisChannelGateway -Bean in andere Beans injiziert werden könnte. Wir haben die Eigenschaft default-request-channel definiert, da es auch möglich ist, Kanalreferenzen pro Methode bereitzustellen, indem die Annotation @Gateway verwendet wird. Klassendefinition:

 public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }

Um diese Konfiguration in unsere Anwendung zu integrieren, müssen wir sie importieren. Dies ist in der SpringIntegrationConfig -Klasse implementiert.

 @ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }

Die Annotation @ImportResource wird verwendet, um Spring-XML-Konfigurationsdateien in .xml zu @Configuration . Und die Annotation @AutoConfigureAfter wird verwendet, um darauf hinzuweisen, dass eine Autokonfiguration nach anderen angegebenen Autokonfigurationsklassen angewendet werden sollte.

Wir werden jetzt einen Dienst erstellen und die Methode implementieren, die Ereignisse in die Redis-Warteschlange enqueue .

 public interface QueueService { void enqueue(PostPublishedEvent event); }
 @Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }

Und jetzt können Sie mit der Methode enqueue von QueueService ganz einfach eine Nachricht an die Warteschlange senden.

Redis-Warteschlangen sind einfach Listen mit einem oder mehreren Produzenten und Konsumenten. Um eine Nachricht in einer Warteschlange zu veröffentlichen, verwenden Produzenten den Befehl LPUSH Redis. Und wenn Sie Redis überwachen (Tipp: Geben Sie redis-cli monitor ein), können Sie sehen, dass die Nachricht zur Warteschlange hinzugefügt wird:

 "LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"

Jetzt müssen wir eine Verbraucheranwendung erstellen, die diese Ereignisse aus der Warteschlange zieht und verarbeitet. Der Consumer-Service benötigt die gleichen Abhängigkeiten wie der Producer-Service.

Jetzt können wir die PostPublishedEvent -Klasse wiederverwenden, um Nachrichten zu deserialisieren.

Wir müssen die Warteschlangenkonfiguration erstellen und sie muss wiederum im Verzeichnis resources/WEB-INF abgelegt werden. Der Inhalt der Warteschlangenkonfiguration ist:

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>

In der .xml -Konfiguration kann int-redis:queue-inbound-channel-adapter die folgenden Eigenschaften haben:

  • id: Der Bean-Name der Komponente.
  • channel: Der MessageChannel , an den wir Nachrichten von diesem Endpunkt senden.
  • auto-startup: Ein SmartLifecycle -Attribut, um anzugeben, ob dieser Endpunkt nach dem Start des Anwendungskontexts automatisch gestartet werden soll oder nicht. Sein Standardwert ist true .
  • phase: Ein SmartLifecycle Attribut zur Angabe der Phase, in der dieser Endpunkt gestartet wird. Sein Standardwert ist 0 .
  • connection-factory: Ein Verweis auf eine RedisConnectionFactory -Bean.
  • queue: Der Name der Redis-Liste, für die der warteschlangenbasierte Pop-Vorgang ausgeführt wird, um Redis-Nachrichten abzurufen.
  • error-channel: Der MessageChannel , an den wir ErrorMessages mit Exceptions von der Listening-Task des Endpoint senden. Standardmäßig verwendet der zugrunde liegende MessagePublishingErrorHandler den Standard errorChannel aus dem Anwendungskontext.
  • serializer: Die RedisSerializer -Bean-Referenz. Es kann eine leere Zeichenfolge sein, was bedeutet, dass kein Serializer vorhanden ist. In diesem Fall wird das rohe byte[] aus der eingehenden Redis-Nachricht als Message an den Kanal gesendet. Standardmäßig ist es ein JdkSerializationRedisSerializer .
  • Receive-Timeout: Das Timeout in Millisekunden für den Pop-Vorgang, um auf eine Redis-Nachricht aus der Warteschlange zu warten. Sein Standardwert ist 1 Sekunde.
  • recovery-interval: Die Zeit in Millisekunden, für die die Listener-Task nach Ausnahmen bei der Pop-Operation schlafen soll, bevor die Listener-Task neu gestartet wird.
  • Expect-Message: Geben Sie an, ob dieser Endpunkt erwartet, dass Daten aus der Redis-Warteschlange ganze Nachrichten enthalten. Wenn dieses Attribut auf true gesetzt ist, kann der Serializer keine leere Zeichenfolge sein, da Nachrichten eine Form der Deserialisierung erfordern (standardmäßig JDK-Serialisierung). Sein Standardwert ist false .
  • task-executor: Ein Verweis auf eine Spring TaskExecutor (oder Standard-JDK 1.5+ Executor)-Bean. Es wird für die zugrunde liegende Höraufgabe verwendet. Standardmäßig wird ein SimpleAsyncTaskExecutor verwendet.
  • right-pop: Geben Sie an, ob dieser Endpunkt right pop (wenn true ) oder left pop (wenn false ) verwenden soll, um Nachrichten aus der Redis-Liste zu lesen. Wenn true , fungiert die Redis-Liste als FIFO-Warteschlange, wenn sie mit einem standardmäßigen Redis-Warteschlangenausgangskanaladapter verwendet wird. Auf „ false setzen, um mit Software verwendet zu werden, die mit der rechten Maustaste in die Liste schreibt, oder um eine stapelartige Nachrichtenreihenfolge zu erreichen. Sein Standardwert ist true .

Der wichtige Teil ist der „Dienstaktivator“, der definiert, welcher Dienst und welche Methode zur Verarbeitung des Ereignisses verwendet werden sollen.'

Außerdem benötigt der json-to-object-transformer ein type-Attribut, um JSON in Objekte umzuwandeln, das oben auf type="com.toptal.integration.spring.model.PostPublishedEvent" .

Um diese Konfiguration zu verbinden, benötigen wir erneut die SpringIntegrationConfig -Klasse, die dieselbe wie zuvor sein kann. Und schließlich brauchen wir einen Dienst, der das Ereignis tatsächlich verarbeitet.

 public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }

Sobald Sie die Anwendung ausgeführt haben, können Sie in Redis Folgendes sehen:

 "BRPOP" "my-event-queue" "1"

Fazit

Mit Spring Integration und Redis ist das Erstellen einer Spring-Microservices-Anwendung nicht so entmutigend, wie es normalerweise der Fall wäre. Mit ein wenig Konfiguration und wenig Boilerplate-Code können Sie im Handumdrehen die Grundlagen Ihrer Microservice-Architektur schaffen.

Auch wenn Sie nicht vorhaben, Ihr aktuelles Spring-Projekt komplett zu streichen und auf eine neue Architektur umzusteigen, ist es mit Hilfe von Redis sehr einfach, enorme Leistungsverbesserungen mit Warteschlangen zu erzielen.