Mikro Hizmet İletişimi: Redis ile Bir Bahar Entegrasyonu Eğitimi

Yayınlanan: 2022-03-11

Mikro hizmet mimarisi, yüksek düzeyde ölçeklenebilir web uygulamaları tasarlama ve uygulamada çok popüler bir yaklaşımdır. Bileşenler arasındaki monolitik bir uygulama içindeki iletişim, genellikle aynı süreç içindeki yöntem veya işlev çağrılarına dayanır. Mikro hizmet tabanlı bir uygulama ise, birden çok makinede çalışan dağıtılmış bir sistemdir.

Bu mikro hizmetler arasındaki iletişim, istikrarlı ve ölçeklenebilir bir sisteme sahip olmak için önemlidir. Bunu yapmanın birden fazla yolu vardır. Mesaj tabanlı iletişim, bunu güvenilir bir şekilde yapmanın bir yoludur.

Mesajlaşmayı kullanırken, bileşenler eşzamansız olarak mesaj alışverişi yaparak birbirleriyle etkileşime girer. Mesajlar kanallar aracılığıyla değiştirilir.

A hizmeti ve B hizmeti arasındaki iletişimi kolaylaştıran bir mesajlaşma sisteminin grafik gösterimi

Servis A, Servis B ile iletişim kurmak istediğinde, doğrudan göndermek yerine, A belirli bir kanala gönderir. Servis B mesajı okumak istediğinde, mesajı belirli bir mesaj kanalından alır.

Bu Bahar Entegrasyonu eğitiminde, Redis kullanarak bir Spring uygulamasında mesajlaşmayı nasıl uygulayacağınızı öğreneceksiniz. Bir hizmetin kuyruktaki olayları ittiği ve başka bir hizmetin bu olayları tek tek işlediği bir örnek uygulama üzerinden geçeceksiniz.

Bahar Entegrasyonu

Spring Integration projesi, Spring tabanlı uygulamalar arasında veya içinde mesajlaşma desteği sağlamak için Spring çerçevesini genişletir. Bileşenler, mesajlaşma paradigması aracılığıyla birbirine bağlanır. Tek tek bileşenler, uygulamadaki diğer bileşenlerin farkında olmayabilir.

Spring Integration, harici sistemlerle iletişim kurmak için çok çeşitli mekanizmalar sağlar. Kanal bağdaştırıcıları, tek yönlü entegrasyon (gönderme veya alma) için kullanılan bu tür mekanizmalardan biridir. Ağ geçitleri, istek/yanıt senaryoları (gelen veya giden) için kullanılır.

Apache Camel, yaygın olarak kullanılan bir alternatiftir. Spring entegrasyonu, Spring ekosisteminin bir parçası olduğu için genellikle mevcut Spring tabanlı hizmetlerde tercih edilir.

redis

Redis, son derece hızlı bir bellek içi veri deposudur. İsteğe bağlı olarak bir diskte de kalabilir. Basit anahtar/değer çiftleri, kümeler, kuyruklar vb. gibi farklı veri yapılarını destekler.

Redis'i kuyruk olarak kullanmak, bileşenler arasında veri paylaşımını ve yatay ölçeklendirmeyi çok daha kolay hale getirir. Bir üretici veya birden fazla üretici verileri kuyruğa gönderebilir ve bir tüketici veya birden fazla tüketici verileri çekip olayı işleyebilir.

Birden fazla tüketici aynı olayı tüketemez; bu, bir olayın bir kez işlenmesini sağlar.

üretici/tüketici mimarisini gösteren diyagram

Redis'i mesaj kuyruğu olarak kullanmanın faydaları:

  • Ayrık görevlerin engelleyici olmayan bir şekilde paralel yürütülmesi
  • Harika performans
  • istikrar
  • Kolay izleme ve hata ayıklama
  • Kolay uygulama ve kullanım

Tüzük:

  • Kuyruğa bir görev eklemek, görevin kendisini işlemekten daha hızlı olmalıdır.
  • Görevleri tüketmek, onları üretmekten daha hızlı olmalıdır (ve değilse, daha fazla tüketici ekleyin).

Redis ile Bahar Entegrasyonu

Aşağıda, Spring Integration'ın Redis ile nasıl kullanılacağını açıklamak için örnek bir uygulamanın oluşturulması anlatılmaktadır.

Diyelim ki, kullanıcıların gönderileri yayınlamasına izin veren bir uygulamanız var. Ve bir takip özelliği oluşturmak istiyorsunuz. Diğer bir gereklilik ise, biri her gönderi yayınladığında, tüm takipçilere bir iletişim kanalı (örn. e-posta veya anlık bildirim) aracılığıyla bilgi verilmesidir.

Bunu uygulamanın bir yolu, kullanıcı bir şey yayınladığında her takipçiye bir e-posta göndermektir. Ancak kullanıcının 1.000 takipçisi olduğunda ne olur? Ve her birinin 1000 takipçisi olan 1000 kullanıcı 10 saniyede bir şey yayınladığında? Ayrıca, yayıncının gönderisi tüm e-postalar gönderilene kadar bekleyecek mi?

Dağıtık sistemler bu sorunu çözer.

Bu özel sorun, bir kuyruk kullanılarak çözülebilir. Gönderilerin yayınlanmasından sorumlu olan Hizmet A (yapımcı) bunu yapacaktır. Bir gönderi yayınlayacak ve bir e-posta alması gereken kullanıcıların listesi ve gönderinin kendisini içeren bir etkinlik gönderecektir. Kullanıcıların listesi B hizmetinde alınabilir, ancak bu örneğin basitliği için onu A hizmetinden göndereceğiz.

Bu asenkron bir işlemdir. Bu, yayınlayan hizmetin e-posta göndermek için beklemek zorunda kalmayacağı anlamına gelir.

Servis B (tüketici) olayı kuyruktan çeker ve işler. Bu şekilde hizmetlerimizi kolayca ölçeklendirebilir ve e-posta gönderen (işleme olayları) n tüketicimiz olabilir.

Öyleyse, üreticinin hizmetinde bir uygulama ile başlayalım. Gerekli bağımlılıklar şunlardır:

 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>

Bu üç Maven bağımlılığı gereklidir:

  • Jedis bir Redis istemcisidir.
  • Spring Data Redis bağımlılığı, Java'da Redis'i kullanmayı kolaylaştırır. Çekirdek API kullanımı için bir şablon sınıfı ve hafif depo tarzı veri erişimi gibi tanıdık Spring konseptleri sağlar.
  • Spring Integration Redis, iyi bilinen Kurumsal Entegrasyon Modellerini desteklemek için Spring programlama modelinin bir uzantısını sağlar.

Ardından, Jedis istemcisini yapılandırmamız gerekiyor:

 @Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }

@Value ek açıklaması, Spring'in uygulama özelliklerinde tanımlanan değeri alana enjekte edeceği anlamına gelir. Bu, uygulama özelliklerinde redis.host ve redis.port değerlerinin tanımlanması gerektiği anlamına gelir.

Şimdi kuyruğa göndermek istediğimiz mesajı tanımlamamız gerekiyor. Basit bir örnek mesaj şöyle görünebilir:

 @Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }

Not: Lombok Projesi (https://projectlombok.org/), alıcılar, ayarlayıcılar ve diğer önemsiz şeylerle kod karmaşasını önlemek için @Getter , @Setter , @Builder ve diğer birçok ek açıklama sağlar. Bu Toptal makalesinden bunun hakkında daha fazla bilgi edinebilirsiniz.

Mesajın kendisi kuyruğa JSON formatında kaydedilecektir. Kuyruğa bir olay her yayınlandığında, mesaj JSON'a serileştirilir. Ve kuyruktan tüketirken, mesaj seri hale getirilecek.

Tanımlanan mesaj ile kuyruğun kendisini tanımlamamız gerekiyor. Spring Integration'da bir .xml konfigürasyonu ile kolayca yapılabilir. Yapılandırma, resources/WEB-INF dizini içine yerleştirilmelidir.

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>

Yapılandırmada, “int-redis:queue-outbound-channel-adapter” bölümünü görebilirsiniz. Özellikleri şunlardır:

  • id: Bileşenin çekirdek adı.
  • kanal: Bu uç noktanın mesajları aldığı MessageChannel .
  • bağlantı fabrikası: Bir RedisConnectionFactory çekirdeğine başvuru.
  • kuyruk: Redis iletileri göndermek için sıra tabanlı gönderme işleminin gerçekleştirildiği Redis listesinin adı. Bu öznitelik, kuyruk ifadesi ile birbirini dışlar.
  • sıra-ifadesi: Çalışma zamanında gelen mesajı #root değişkeni olarak kullanarak Redis listesinin adını belirlemek için bir SpEL ifadesi. Bu öznitelik, sıra ile karşılıklı olarak özeldir.
  • serileştirici: Bir RedisSerializer fasulye referansı. Varsayılan olarak, bir JdkSerializationRedisSerializer . Ancak, bir seri hale getirici referansı sağlanmadıysa, String yükleri için bir StringRedisSerializer kullanılır.
  • çıkarma yükü: Bu uç noktanın yalnızca yükü Redis kuyruğuna mı yoksa tüm iletiye mi göndermesi gerektiğini belirtin. Varsayılan değeri true .
  • left-push: Bu uç noktanın Redis listesine mesaj yazmak için sol push ( true olduğunda) veya sağ push ( false olduğunda) kullanıp kullanmayacağını belirtin. Doğruysa, Redis listesi, varsayılan bir Redis kuyruğu gelen kanal bağdaştırıcısı ile kullanıldığında bir FIFO kuyruğu görevi görür. Sol pop ile listeden okuyan yazılımla kullanmak veya yığın benzeri bir mesaj sırası elde etmek için false olarak ayarlayın. Varsayılan değeri true .

Bir sonraki adım, .xml yapılandırmasında bahsedilen ağ geçidini tanımlamaktır. Ağ geçidi için org.toptal.queue paketindeki RedisChannelGateway sınıfını kullanıyoruz.

StringRedisSerializer , mesajı Redis'e kaydetmeden önce seri hale getirmek için kullanılır. Ayrıca .xml konfigürasyonunda ağ geçidini tanımladık ve RedisChannelGateway bir ağ geçidi servisi olarak ayarladık. Bu, RedisChannelGateway çekirdeğinin diğer çekirdeklere enjekte edilebileceği anlamına gelir. @Gateway ek açıklamasını kullanarak yöntem başına kanal referansları sağlamak da mümkün olduğundan, default-request-channel özelliğini tanımladık. Sınıf tanımı:

 public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }

Bu yapılandırmayı uygulamamıza bağlamak için içe aktarmalıyız. Bu, SpringIntegrationConfig sınıfında uygulanır.

 @ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }

@ImportResource ek açıklaması, Spring .xml yapılandırma dosyalarını @Configuration içine aktarmak için kullanılır. Ve @AutoConfigureAfter notu, belirtilen diğer otomatik konfigürasyon sınıflarından sonra bir otomatik konfigürasyonun uygulanması gerektiğini ima etmek için kullanılır.

Şimdi bir servis oluşturacağız ve olayları Redis kuyruğuna enqueue alacak yöntemi uygulayacağız.

 public interface QueueService { void enqueue(PostPublishedEvent event); }
 @Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }

Ve şimdi, QueueService enqueue yöntemini kullanarak kuyruğa kolayca bir mesaj gönderebilirsiniz.

Redis kuyrukları, bir veya daha fazla üretici ve tüketici içeren basit listelerdir. Bir kuyruğa mesaj yayınlamak için üreticiler LPUSH Redis komutunu kullanır. Ve Redis'i izlerseniz (ipucu: yazın redis-cli monitor ), mesajın kuyruğa eklendiğini görebilirsiniz:

 "LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"

Şimdi bu olayları kuyruktan çekip işleyecek bir tüketici uygulaması oluşturmamız gerekiyor. Tüketici hizmeti, üretici hizmetiyle aynı bağımlılıklara ihtiyaç duyar.

Artık mesajları seri durumdan çıkarmak için PostPublishedEvent sınıfını yeniden kullanabiliriz.

Kuyruk yapılandırmasını oluşturmamız gerekiyor ve yine bunu resources/WEB-INF dizini içine yerleştirmemiz gerekiyor. Kuyruk yapılandırmasının içeriği:

 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>

.xml yapılandırmasında int-redis:queue-inbound-channel-adapter aşağıdaki özelliklere sahip olabilir:

  • id: Bileşenin çekirdek adı.
  • kanal: Bu uç noktadan mesaj gönderdiğimiz MessageChannel .
  • auto-startup: Bu uç noktanın uygulama bağlamı başladıktan sonra otomatik olarak başlayıp başlamayacağını belirtmek için bir SmartLifecycle özniteliği. Varsayılan değeri true .
  • aşama: Bu uç noktanın başlatılacağı aşamayı belirtmek için bir SmartLifecycle özniteliği. Varsayılan değeri 0 .
  • bağlantı fabrikası: Bir RedisConnectionFactory çekirdeğine başvuru.
  • sıra: Redis mesajlarını almak için sıra tabanlı pop işleminin gerçekleştirildiği Redis listesinin adı.
  • error-channel: Endpoint dinleme görevinden Exceptions ErrorMessages göndereceğimiz MessageChannel . Varsayılan olarak, temel alınan MessagePublishingErrorHandler , uygulama bağlamından varsayılan errorChannel kullanır.
  • serileştirici: RedisSerializer fasulye referansı. Serileştirici olmadığı anlamına gelen boş bir dize olabilir. Bu durumda, gelen Redis mesajından gelen ham byte[] kanala Message yükü olarak gönderilir. Varsayılan olarak, bir JdkSerializationRedisSerializer .
  • alma zaman aşımı: Pop işleminin kuyruktan bir Redis iletisini beklemesi için milisaniye cinsinden zaman aşımı. Varsayılan değeri 1 saniyedir.
  • kurtarma aralığı: Dinleyici görevini yeniden başlatmadan önce, pop işlemindeki istisnalardan sonra dinleyici görevinin uyuması gereken milisaniye cinsinden süre.
  • wait-message: Bu uç noktanın Redis kuyruğundan tüm iletileri içermesini bekleyip beklemediğini belirtin. Bu öznitelik true olarak ayarlanırsa, iletiler bir tür seri durumdan çıkarma (varsayılan olarak JDK serileştirme) gerektirdiğinden serileştirici boş bir dize olamaz. Varsayılan değeri false .
  • görev yürütücü: Spring TaskExecutor (veya standart JDK 1.5+ Executor) çekirdeğine referans. Temel dinleme görevi için kullanılır. Varsayılan olarak SimpleAsyncTaskExecutor kullanılır.
  • right-pop: Bu uç noktanın Redis listesinden mesajları okumak için sağ pop ( true olduğunda) veya sol pop ( false olduğunda) kullanıp kullanmayacağını belirtin. true ise, Redis listesi, varsayılan bir Redis kuyruğu giden kanal bağdaştırıcısı ile kullanıldığında bir FIFO kuyruğu görevi görür. Sağ tuşla listeye yazan yazılımla kullanmak veya yığın benzeri bir mesaj sırası elde etmek için false olarak ayarlayın. Varsayılan değeri true .

Önemli kısım, olayı işlemek için hangi hizmetin ve yöntemin kullanılması gerektiğini tanımlayan “hizmet etkinleştirici”dir.'

Ayrıca, json-to-object-transformer , JSON'u nesnelere dönüştürmek için bir type özniteliğine ihtiyaç duyar, yukarıda type="com.toptal.integration.spring.model.PostPublishedEvent" olarak ayarlanmıştır.

Yine, bu yapılandırmayı bağlamak için, öncekiyle aynı olabilen SpringIntegrationConfig sınıfına ihtiyacımız olacak. Ve son olarak, olayı gerçekten işleyecek bir hizmete ihtiyacımız var.

 public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }

Uygulamayı çalıştırdıktan sonra Redis'te şunları görebilirsiniz:

 "BRPOP" "my-event-queue" "1"

Çözüm

Spring Entegrasyonu ve Redis ile bir Spring mikro hizmet uygulaması oluşturmak normalde olduğu kadar göz korkutucu değildir. Küçük bir yapılandırma ve az miktarda ortak kod ile mikro hizmet mimarinizin temellerini çok kısa sürede oluşturabilirsiniz.

Mevcut Spring projenizi tamamen kazımayı ve yeni bir mimariye geçmeyi düşünmüyorsanız bile, Redis'in yardımıyla, kuyruklarla büyük performans iyileştirmeleri elde etmek çok basit.