마이크로서비스 커뮤니케이션: Redis를 사용한 Spring 통합 튜토리얼
게시 됨: 2022-03-11마이크로서비스 아키텍처는 확장성이 뛰어난 웹 애플리케이션을 설계하고 구현하는 데 있어 매우 널리 사용되는 접근 방식입니다. 구성 요소 간의 모놀리식 응용 프로그램 내의 통신은 일반적으로 동일한 프로세스 내의 메서드 또는 함수 호출을 기반으로 합니다. 반면에 마이크로서비스 기반 애플리케이션은 여러 시스템에서 실행되는 분산 시스템입니다.
이러한 마이크로 서비스 간의 통신은 안정적이고 확장 가능한 시스템을 갖기 위해 중요합니다. 이를 수행하는 방법에는 여러 가지가 있습니다. 메시지 기반 통신은 이를 안정적으로 수행하는 한 가지 방법입니다.
메시징을 사용할 때 구성 요소는 비동기적으로 메시지를 교환하여 서로 상호 작용합니다. 메시지는 채널을 통해 교환됩니다.
서비스 A는 서비스 B와 통신을 원할 때 직접 보내는 대신 특정 채널로 보냅니다. 서비스 B가 메시지를 읽으려고 할 때 특정 메시지 채널에서 메시지를 선택합니다.
이 Spring Integration 튜토리얼에서는 Redis를 사용하여 Spring 애플리케이션에서 메시징을 구현하는 방법을 배웁니다. 한 서비스가 대기열의 이벤트를 푸시하고 다른 서비스가 이러한 이벤트를 하나씩 처리하는 예제 애플리케이션을 살펴보겠습니다.
스프링 통합
Spring Integration 프로젝트는 Spring 프레임워크를 확장하여 Spring 기반 애플리케이션 간 또는 내에서 메시징 지원을 제공합니다. 구성 요소는 메시징 패러다임을 통해 함께 연결됩니다. 개별 구성 요소는 응용 프로그램의 다른 구성 요소를 인식하지 못할 수 있습니다.
Spring Integration은 외부 시스템과 통신하기 위한 다양한 메커니즘을 제공합니다. 채널 어댑터는 단방향 통합(보내기 또는 받기)에 사용되는 메커니즘 중 하나입니다. 그리고 게이트웨이는 요청/회신 시나리오(인바운드 또는 아웃바운드)에 사용됩니다.
Apache Camel은 널리 사용되는 대안입니다. Spring 통합은 일반적으로 Spring 생태계의 일부이기 때문에 기존 Spring 기반 서비스에서 선호됩니다.
레디스
Redis는 매우 빠른 인메모리 데이터 저장소입니다. 선택적으로 디스크에도 지속할 수 있습니다. 간단한 키-값 쌍, 집합, 대기열 등과 같은 다양한 데이터 구조를 지원합니다.
Redis를 대기열로 사용하면 구성 요소 간 데이터 공유와 수평 확장이 훨씬 쉬워집니다. 생산자 또는 여러 생산자는 데이터를 대기열에 푸시할 수 있으며 소비자 또는 여러 소비자는 데이터를 가져와 이벤트를 처리할 수 있습니다.
여러 소비자가 동일한 이벤트를 사용할 수 없습니다. 이렇게 하면 하나의 이벤트가 한 번 처리됩니다.
Redis를 메시지 대기열로 사용할 때의 이점:
- 비차단 방식으로 개별 작업의 병렬 실행
- 뛰어난 성능
- 안정
- 간편한 모니터링 및 디버깅
- 간편한 구현 및 사용
규칙:
- 대기열에 작업을 추가하는 것이 작업 자체를 처리하는 것보다 더 빨라야 합니다.
- 작업을 소비하는 것이 생산하는 것보다 빨라야 합니다(그렇지 않은 경우 더 많은 소비자를 추가해야 함).
Redis와 스프링 통합
다음은 Redis와 Spring 통합을 사용하는 방법을 설명하는 샘플 애플리케이션 생성을 안내합니다.
사용자가 게시물을 게시할 수 있는 애플리케이션이 있다고 가정해 보겠습니다. 그리고 팔로우 기능을 만들고 싶습니다. 또 다른 요구 사항은 누군가가 게시물을 게시할 때마다 모든 팔로워가 일부 커뮤니케이션 채널(예: 이메일 또는 푸시 알림)을 통해 알림을 받아야 한다는 것입니다.
이를 구현하는 한 가지 방법은 사용자가 무언가를 게시하면 각 팔로워에게 이메일을 보내는 것입니다. 하지만 사용자가 1,000명의 팔로워를 가지고 있으면 어떻게 될까요? 그리고 1,000명의 사용자가 10초 안에 무언가를 게시하면 각각의 팔로워가 1,000명입니까? 또한 게시자의 게시물은 모든 이메일이 전송될 때까지 기다려야 합니까?
분산 시스템은 이 문제를 해결합니다.
이 특정 문제는 대기열을 사용하여 해결할 수 있습니다. 게시물 게시를 담당하는 서비스 A(프로듀서)가 하면 됩니다. 게시물을 게시하고 이메일과 게시물 자체를 수신해야 하는 사용자 목록과 함께 이벤트를 푸시합니다. 사용자 목록은 서비스 B에서 가져올 수 있지만 이 예의 단순성을 위해 서비스 A에서 보냅니다.
이것은 비동기 작업입니다. 이는 게시 중인 서비스가 이메일을 보내기 위해 기다릴 필요가 없음을 의미합니다.
서비스 B(소비자)는 대기열에서 이벤트를 가져와 처리합니다. 이렇게 하면 서비스를 쉽게 확장할 수 있고 n
명의 소비자가 이메일을 보내도록 할 수 있습니다(이벤트 처리).
이제 생산자 서비스에서 구현을 시작하겠습니다. 필요한 종속성은 다음과 같습니다.
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency>
다음 세 가지 Maven 종속성이 필요합니다.
- Jedis는 Redis 클라이언트입니다.
- Spring Data Redis 종속성은 Java에서 Redis를 더 쉽게 사용할 수 있도록 합니다. 핵심 API 사용 및 경량 저장소 스타일 데이터 액세스를 위한 템플릿 클래스와 같은 친숙한 Spring 개념을 제공합니다.
- Spring Integration Redis는 잘 알려진 엔터프라이즈 통합 패턴을 지원하기 위해 Spring 프로그래밍 모델의 확장을 제공합니다.
다음으로 Jedis 클라이언트를 구성해야 합니다.
@Configuration public class RedisConfig { @Value("${redis.host}") private String redisHost; @Value("${redis.port:6379}") private int redisPort; @Bean public JedisPoolConfig poolConfig() { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(128); return poolConfig; } @Bean public RedisConnectionFactory redisConnectionFactory(JedisPoolConfig poolConfig) { final JedisConnectionFactory connectionFactory = new JedisConnectionFactory(); connectionFactory.setHostName(redisHost); connectionFactory.setPort(redisPort); connectionFactory.setPoolConfig(poolConfig); connectionFactory.setUsePool(true); return connectionFactory; } }
@Value
주석은 Spring이 애플리케이션 속성에 정의된 값을 필드에 주입함을 의미합니다. 이는 redis.host
및 redis.port
값이 애플리케이션 속성에 정의되어야 함을 의미합니다.
이제 대기열에 보낼 메시지를 정의해야 합니다. 간단한 예제 메시지는 다음과 같습니다.
@Getter @Setter @Builder public class PostPublishedEvent { private String postUrl; private String postTitle; private List<String> emails; }
참고: Project Lombok(https://projectlombok.org/)은 @Getter
, @Setter
, @Builder
및 기타 여러 주석을 제공하여 getter, setter 및 기타 사소한 것들로 코드를 복잡하게 만드는 것을 방지합니다. 이 Toptal 기사에서 자세히 알아볼 수 있습니다.
메시지 자체는 대기열에 JSON 형식으로 저장됩니다. 이벤트가 큐에 게시될 때마다 메시지는 JSON으로 직렬화됩니다. 그리고 큐에서 소비할 때 메시지는 역직렬화됩니다.
메시지가 정의되면 대기열 자체를 정의해야 합니다. Spring Integration에서는 .xml
구성을 통해 쉽게 수행할 수 있습니다. 구성은 resources/WEB-INF
디렉토리 안에 있어야 합니다.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-outbound-channel-adapter channel="eventChannelJson" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory" queue="my-event-queue" /> <int:gateway service-interface="org.toptal.queue.RedisChannelGateway" error-channel="errorChannel" default-request-channel="eventChannel"> <int:default-header name="topic" value="queue"/> </int:gateway> <int:channel/> <int:channel/> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:object-to-json-transformer input-channel="eventChannel" output-channel="eventChannelJson"/> </beans>
구성에서 "int-redis:queue-outbound-channel-adapter" 부분을 볼 수 있습니다. 속성은 다음과 같습니다.
- id: 컴포넌트의 빈 이름.
- channel: 이 끝점이 메시지를 받는
MessageChannel
입니다. - connection-factory:
RedisConnectionFactory
빈에 대한 참조입니다. - queue: Redis 메시지를 보내기 위해 큐 기반 푸시 작업이 수행되는 Redis 목록의 이름입니다. 이 속성은 queue-expression과 함께 사용할 수 없습니다.
- queue-expression: 런타임 시 들어오는 메시지를
#root
변수로 사용하여 Redis 목록의 이름을 결정하는 SpEL 표현식입니다. 이 속성은 대기열과 상호 배타적입니다. - serializer:
RedisSerializer
빈 참조입니다. 기본적으로JdkSerializationRedisSerializer
입니다. 그러나String
페이로드의 경우 직렬 변환기 참조가 제공되지 않으면StringRedisSerializer
가 사용됩니다. - extract-payload: 이 끝점이 Redis 대기열 또는 전체 메시지에 페이로드만 보내야 하는지 여부를 지정합니다. 기본값은
true
입니다. - left-push: 이 엔드포인트가 Redis 목록에 메시지를 쓰기 위해 왼쪽 푸시(
true
인 경우) 또는 오른쪽 푸시(false
)를 사용해야 하는지 여부를 지정합니다. true인 경우 Redis 목록은 기본 Redis 대기열 인바운드 채널 어댑터와 함께 사용될 때 FIFO 대기열로 작동합니다. 왼쪽 팝으로 목록에서 읽는 소프트웨어와 함께 사용하거나 스택과 같은 메시지 순서를 달성하려면false
로 설정하십시오. 기본값은true
입니다.
다음 단계는 .xml
구성에 언급된 게이트웨이를 정의하는 것입니다. 게이트웨이의 경우 org.toptal.queue
패키지의 RedisChannelGateway
클래스를 사용하고 있습니다.

StringRedisSerializer
는 Redis에 저장하기 전에 메시지를 직렬화하는 데 사용됩니다. 또한 .xml
구성에서 게이트웨이를 정의하고 RedisChannelGateway
를 게이트웨이 서비스로 설정했습니다. 이는 RedisChannelGateway
빈을 다른 빈에 주입할 수 있음을 의미합니다. @Gateway
주석을 사용하여 메서드별 채널 참조를 제공할 수도 있기 때문에 default-request-channel
속성을 정의했습니다. 클래스 정의:
public interface RedisChannelGateway { void enqueue(PostPublishedEvent event); }
이 구성을 애플리케이션에 연결하려면 가져와야 합니다. 이것은 SpringIntegrationConfig
클래스에서 구현됩니다.
@ImportResource("classpath:WEB-INF/event-queue-config.xml") @AutoConfigureAfter(RedisConfig.class) @Configuration public class SpringIntegrationConfig { }
@ImportResource
주석은 Spring .xml
구성 파일을 @Configuration
으로 가져오는 데 사용됩니다. 그리고 @AutoConfigureAfter
주석은 자동 구성이 지정된 다른 자동 구성 클래스 다음에 적용되어야 함을 암시하는 데 사용됩니다.
이제 서비스를 생성하고 이벤트를 Redis enqueue
에 추가하는 메서드를 구현합니다.
public interface QueueService { void enqueue(PostPublishedEvent event); }
@Service public class RedisQueueService implements QueueService { private RedisChannelGateway channelGateway; @Autowired public RedisQueueService(RedisChannelGateway channelGateway) { this.channelGateway = channelGateway; } @Override public void enqueue(PostPublishedEvent event) { channelGateway.enqueue(event); } }
이제 QueueService
의 enqueue
메서드를 사용하여 대기열에 메시지를 쉽게 보낼 수 있습니다.
Redis 대기열은 단순히 하나 이상의 생산자와 소비자가 있는 목록입니다. 메시지를 대기열에 게시하기 위해 생산자는 LPUSH
Redis 명령을 사용합니다. 그리고 Redis(힌트: type redis-cli monitor
)를 모니터링하면 메시지가 대기열에 추가되는 것을 볼 수 있습니다.
"LPUSH" "my-event-queue" "{\"postUrl\":\"test\",\"postTitle\":\"test\",\"emails\":[\"test\"]}"
이제 대기열에서 이러한 이벤트를 가져와 처리할 소비자 응용 프로그램을 만들어야 합니다. 소비자 서비스에는 생산자 서비스와 동일한 종속성이 필요합니다.
이제 PostPublishedEvent
클래스를 재사용하여 메시지를 역직렬화할 수 있습니다.
대기열 구성을 생성해야 하며, 다시 resources/WEB-INF
디렉토리에 배치해야 합니다. 대기열 구성의 내용은 다음과 같습니다.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-redis="http://www.springframework.org/schema/integration/redis" xsi:schemaLocation="http://www.springframework.org/schema/integration/redis http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <int-redis:queue-inbound-channel-adapter channel="eventChannelJson" queue="my-event-queue" serializer="serializer" auto-startup="true" connection-factory="redisConnectionFactory"/> <int:channel/> <int:channel> <int:queue/> </int:channel> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <int:json-to-object-transformer input-channel="eventChannelJson" output-channel="eventChannel" type="com.toptal.integration.spring.model.PostPublishedEvent"/> <int:service-activator input-channel="eventChannel" ref="RedisEventProcessingService" method="process"> <int:poller fixed-delay="10" time-unit="SECONDS" max-messages-per-poll="500"/> </int:service-activator> </beans>
.xml
구성에서 int-redis:queue-inbound-channel-adapter
는 다음 속성을 가질 수 있습니다.
- id: 컴포넌트의 빈 이름.
- channel: 이 끝점에서 메시지를 보내는
MessageChannel
입니다. - auto-startup: 이 끝점이 응용 프로그램 컨텍스트 시작 후 자동으로 시작되어야 하는지 여부를 지정하는
SmartLifecycle
속성입니다. 기본값은true
입니다. - 단계: 이 끝점이 시작될 단계를 지정하는
SmartLifecycle
속성입니다. 기본값은0
입니다. - connection-factory:
RedisConnectionFactory
빈에 대한 참조입니다. - queue: Redis 메시지를 가져오기 위해 큐 기반 팝 작업이 수행되는 Redis 목록의 이름입니다.
- error-channel:
Endpoint
의 수신 작업에서Exceptions
가 있는ErrorMessages
를 보낼MessageChannel
입니다. 기본적으로 기본MessagePublishingErrorHandler
는 애플리케이션 컨텍스트의 기본errorChannel
을 사용합니다. - serializer:
RedisSerializer
빈 참조입니다. 직렬 변환기가 없음을 의미하는 빈 문자열일 수 있습니다. 이 경우 인바운드 Redis 메시지의 rawbyte[]
가Message
페이로드로 채널에 전송됩니다. 기본적으로JdkSerializationRedisSerializer
입니다. - receive-timeout: 팝 작업이 대기열에서 Redis 메시지를 기다리는 시간 제한(밀리초)입니다. 기본값은 1초입니다.
- Recovery-interval: 리스너 작업을 다시 시작하기 전에 팝 작업에 대한 예외 이후 리스너 작업이 휴면해야 하는 시간(밀리초)입니다.
- expect-message: 이 끝점이 Redis 대기열의 데이터에 전체 메시지가 포함될 것으로 예상하는지 여부를 지정합니다. 이 속성이
true
로 설정되면 메시지에는 일종의 역직렬화(기본적으로 JDK 직렬화)가 필요하기 때문에 직렬 변환기는 빈 문자열일 수 없습니다. 기본값은false
입니다. - task-executor: Spring
TaskExecutor
(또는 표준 JDK 1.5+ Executor) 빈에 대한 참조. 기본 청취 작업에 사용됩니다. 기본적으로SimpleAsyncTaskExecutor
가 사용됩니다. - right-pop: 이 엔드포인트가 Redis 목록에서 메시지를 읽기 위해 오른쪽 팝(
true
인 경우) 또는 왼쪽 팝(false
)을 사용해야 하는지 여부를 지정합니다.true
인 경우 Redis 목록은 기본 Redis 대기열 아웃바운드 채널 어댑터와 함께 사용될 때 FIFO 대기열로 작동합니다. 오른쪽 푸시로 목록에 쓰는 소프트웨어와 함께 사용하거나 스택과 같은 메시지 순서를 달성하려면false
로 설정하십시오. 기본값은true
입니다.
중요한 부분은 이벤트를 처리하는 데 사용해야 하는 서비스와 방법을 정의하는 "서비스 액티베이터"입니다.'
또한 json-to-object-transformer
는 JSON을 객체로 변환하기 위해 type 속성이 필요하며 위에서 type="com.toptal.integration.spring.model.PostPublishedEvent"
로 설정했습니다.
다시 말하지만, 이 구성을 연결하려면 이전과 동일할 수 있는 SpringIntegrationConfig
클래스가 필요합니다. 마지막으로 이벤트를 실제로 처리할 서비스가 필요합니다.
public interface EventProcessingService { void process(PostPublishedEvent event); } @Service("RedisEventProcessingService") public class RedisEventProcessingService implements EventProcessingService { @Override public void process(PostPublishedEvent event) { // TODO: Send emails here, retry strategy, etc :) } }
애플리케이션을 실행하면 Redis에서 다음을 볼 수 있습니다.
"BRPOP" "my-event-queue" "1"
결론
Spring Integration 및 Redis를 사용하면 Spring 마이크로서비스 애플리케이션을 구축하는 것이 일반적으로 하는 것만큼 어렵지 않습니다. 약간의 구성과 소량의 상용구 코드만 있으면 마이크로서비스 아키텍처의 기초를 순식간에 구축할 수 있습니다.
현재 Spring 프로젝트를 완전히 긁고 새 아키텍처로 전환할 계획이 없더라도 Redis의 도움으로 대기열을 사용하여 엄청난 성능 향상을 얻는 것은 매우 간단합니다.