Оптимизация интеграции программного обеспечения: учебник по Apache Camel
Опубликовано: 2022-03-11Программное обеспечение редко, если вообще существует, в информационном вакууме. По крайней мере, это предположение, которое мы, инженеры-программисты, можем сделать для большинства разрабатываемых нами приложений.
В любом масштабе каждая часть программного обеспечения — так или иначе — взаимодействует с каким-либо другим программным обеспечением по разным причинам: для получения откуда-то справочных данных, для отправки сигналов мониторинга, для связи с другими службами, будучи частью распределенной системы. система и многое другое.
Из этого руководства вы узнаете, какие самые большие проблемы возникают при интеграции большого программного обеспечения и как Apache Camel легко их решает.
Проблема: проектирование архитектуры для системной интеграции
Вы, возможно, сделали следующее по крайней мере один раз в своей жизни разработки программного обеспечения:
- Определите фрагмент вашей бизнес-логики, который должен инициировать отправку данных.
- На том же прикладном уровне запишите преобразования данных в соответствии с ожиданиями получателя.
- Оберните данные в структуру, подходящую для передачи и маршрутизации по сети.
- Откройте подключение к целевому приложению с помощью соответствующего драйвера или клиентского SDK.
- Отправьте данные и обработайте ответ.
Почему это плохая линия поведения?
Пока у вас всего несколько подключений такого рода, он остается управляемым. С ростом числа отношений между системами бизнес-логика приложения смешивается с интеграционной логикой, которая связана с адаптацией данных, компенсацией технологических различий между двумя системами и передачей данных во внешнюю систему с помощью SOAP, REST или более экзотических запросов. .
Если бы вы интегрировали несколько приложений, было бы невероятно сложно проследить всю картину зависимостей в таком коде: где производятся данные и какие службы их потребляют? У вас будет много мест, где интеграционная логика дублируется.
При таком подходе, хотя технически задача решена, мы получаем огромные проблемы с ремонтопригодностью и масштабируемостью интеграции. Быстрая реорганизация потоков данных в этой системе практически невозможна, не говоря уже о более глубоких проблемах, таких как отсутствие мониторинга, разрыв цепи, трудоемкое восстановление данных и т. д.
Все это особенно важно при интеграции программного обеспечения в рамках достаточно крупного предприятия. Интеграция предприятия означает работу с набором приложений, которые работают на самых разных платформах и находятся в разных местах. Обмен данными в таком программном ландшафте довольно требователен. Он должен соответствовать высоким отраслевым стандартам безопасности и обеспечивать надежный способ передачи данных. В корпоративной среде системная интеграция требует отдельного тщательно проработанного архитектурного проекта.
Эта статья познакомит вас с уникальными трудностями, возникающими при интеграции программного обеспечения, а также предоставит некоторые основанные на опыте решения задач интеграции. Мы познакомимся с Apache Camel, полезной структурой, которая может облегчить головную боль разработчика интеграции. Далее мы рассмотрим пример того, как Camel может помочь установить связь в кластере микросервисов на базе Kubernetes.
Трудности интеграции
Широко используемый подход к решению этой проблемы заключается в отделении уровня интеграции в вашем приложении. Он может существовать в том же приложении или в виде независимой выделенной части программного обеспечения, в последнем случае называемого промежуточным программным обеспечением.
С какими проблемами вы обычно сталкиваетесь при разработке и поддержке промежуточного ПО? В общем, у вас есть следующие ключевые элементы:
- Все каналы передачи данных в той или иной степени ненадежны. Проблемы, возникающие из-за этой ненадежности, могут не возникать при низкой или умеренной интенсивности данных. Каждый уровень хранения от памяти приложений до нижнего кэша и оборудования под ним подвержен потенциальному сбою. Некоторые редкие ошибки возникают только при огромных объемах данных. Даже у зрелых, готовых к производству продуктов поставщиков есть нерешенные проблемы отслеживания ошибок, связанные с потерей данных. Система промежуточного программного обеспечения должна быть в состоянии информировать вас об этих потерях данных и своевременно обеспечивать повторную доставку сообщений.
- Приложения используют разные протоколы и форматы данных. Это означает, что система интеграции представляет собой завесу для преобразования данных и адаптеров для других участников и использует различные технологии. Это могут быть простые вызовы REST API, а также доступ к брокеру очередей, отправка заказов в формате CSV по FTP или пакетное извлечение данных в таблицу базы данных. Это длинный список, и он никогда не станет короче.
- Изменения в форматах данных и правилах маршрутизации неизбежны. Каждый шаг в процессе разработки приложения, изменяющий структуру данных, обычно приводит к изменению форматов и преобразований интеграционных данных. Иногда необходимы изменения инфраструктуры с реорганизацией потоков корпоративных данных. Например, эти изменения могут произойти при введении единой точки проверки справочных данных, которая должна обрабатывать все записи основных данных по всей компании. С
N
системами у нас может быть максимальное количество соединений между ними, почтиN^2
, поэтому количество мест, где должны применяться изменения, растет довольно быстро. Это будет похоже на лавину. Для поддержания ремонтопригодности уровень промежуточного программного обеспечения должен предоставлять четкую картину зависимостей с универсальной маршрутизацией и преобразованием данных.
Эти идеи следует учитывать при проектировании интеграции и выборе наиболее подходящего решения промежуточного программного обеспечения. Один из возможных способов справиться с этим — использовать сервисную шину предприятия (ESB). Но ESB, предоставляемые крупными поставщиками, как правило, слишком тяжелы и часто доставляют больше проблем, чем пользы: практически невозможно быстро начать работу с ESB, у нее довольно крутая кривая обучения, а ее гибкость приносится в жертву длинному списку. функций и встроенных инструментов. На мой взгляд, легкие интеграционные решения с открытым исходным кодом намного лучше — они более гибкие, их легко развернуть в облаке и легко масштабировать.
Интеграция программного обеспечения не так проста. Сегодня, когда мы создаем архитектуры микросервисов и имеем дело с множеством небольших сервисов, мы также возлагаем большие надежды на то, насколько эффективно они должны взаимодействовать.
Шаблоны корпоративной интеграции
Как и следовало ожидать, как и разработка программного обеспечения в целом, разработка маршрутизации и преобразования данных включает повторяющиеся операции. Опыт в этой области обобщается и систематизируется профессионалами, давно занимающимися проблемами интеграции. В результате получается набор извлеченных шаблонов, называемых шаблонами корпоративной интеграции, которые используются для проектирования потоков данных. Эти методы интеграции были описаны в одноименной книге Грегора Хофа и Бобби Вулфа, которая очень похожа на значительную книгу Gang of Four, но посвящена склеиванию программного обеспечения.
Например, шаблон нормализатора вводит компонент, который отображает семантически одинаковые сообщения, имеющие разные форматы данных, в единую каноническую модель, или агрегатор — это EIP, объединяющий последовательность сообщений в одно.
Поскольку они представляют собой устоявшиеся технологически независимые абстракции, используемые для решения архитектурных проблем, EIP помогают в написании архитектурного проекта, который не углубляется в уровень кода, но достаточно подробно описывает потоки данных. Такая нотация описания интеграционных маршрутов не только делает дизайн лаконичным, но и задает общую номенклатуру и общий язык, что очень важно в контексте решения интеграционной задачи с участием членов команды из разных сфер бизнеса.
Представляем Apache Camel
Несколько лет назад я создавал корпоративную интеграцию в огромной сети розничной торговли продуктами питания с магазинами, расположенными в разных местах. Я начал с проприетарного решения ESB, которое оказалось слишком громоздким в обслуживании. Затем наша команда столкнулась с Apache Camel, и, проделав некоторую работу по «проверке концепции», мы быстро переписали все наши потоки данных в маршрутах Camel.
Apache Camel можно охарактеризовать как «маршрутизатор-посредник», фреймворк промежуточного программного обеспечения, ориентированный на сообщения, реализующий список EIP, с которым я ознакомился. Он использует эти шаблоны, поддерживает все распространенные транспортные протоколы и включает в себя широкий набор полезных адаптеров. Camel позволяет выполнять ряд процедур интеграции без необходимости написания собственного кода.
Помимо этого, я бы выделил следующие особенности Apache Camel:
- Маршруты интеграции записываются в виде конвейеров, состоящих из блоков. Он создает полностью прозрачную картину, помогающую отслеживать потоки данных.
- У Camel есть адаптеры для многих популярных API. Например, получение данных от Apache Kafka, мониторинг инстансов AWS EC2, интеграция с Salesforce — все эти задачи можно решить с помощью компонентов, доступных «из коробки».
Маршруты Apache Camel могут быть написаны на Java или Scala DSL. (Конфигурация XML также доступна, но становится слишком многословной и имеет худшие возможности отладки.) Она не накладывает ограничений на технический стек взаимодействующих сервисов, но если вы пишете на Java или Scala, вместо этого вы можете встроить Camel в приложение. запустить его в автономном режиме.
Нотацию маршрутизации, используемую Camel, можно описать с помощью следующего простого псевдокода:
from(Source) .transform(Transformer) .to(Destination)
Source
, Transformer
и Destination
являются конечными точками, ссылающимися на компоненты реализации по их URI.
Что позволяет Camel решать описанные выше проблемы интеграции? Давайте посмотрим. Во-первых, логика маршрутизации и преобразования теперь существует только в выделенной конфигурации Apache Camel. Во-вторых, благодаря лаконичному и естественному DSL в сочетании с использованием EIP появляется картина зависимостей между системами. Он состоит из понятных абстракций, а логика маршрутизации легко настраивается. И, наконец, нам не нужно писать кучу кода преобразования, потому что соответствующие адаптеры, скорее всего, уже включены.
Я должен добавить, что Apache Camel — это зрелый фреймворк, который регулярно обновляется. У него большое сообщество и значительная совокупная база знаний.
У него есть свои недостатки. Camel не следует воспринимать как сложный пакет интеграции. Это набор инструментов без функций высокого уровня, таких как инструменты управления бизнес-процессами или мониторы активности, но его можно использовать для создания такого программного обеспечения.
Альтернативными системами могут быть, например, Spring Integration или Mule ESB. Для интеграции Spring, хотя она и считается легкой, по моему опыту, ее сборка и написание большого количества XML-файлов конфигурации может оказаться неожиданно сложным и вряд ли легким выходом. Mule ESB — это надежный и очень функциональный набор инструментов, но, как следует из названия, это сервисная шина предприятия, поэтому он относится к другой весовой категории. Mule можно сравнить с Fuse ESB, аналогичным продуктом на базе Apache Camel с богатым набором функций. Для меня использование Apache Camel для склеивания сервисов сегодня не составляет труда. Он прост в использовании и дает четкое описание того, что и куда нужно, и в то же время он достаточно функционален для создания сложных интеграций.
Написание примера маршрута
Приступим к написанию кода. Мы начнем с синхронного потока данных, который направляет сообщения из одного источника в список получателей. Правила маршрутизации будут написаны на Java DSL.
Мы будем использовать Maven для сборки проекта. Сначала добавьте следующую зависимость в pom.xml
:
<dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>
Кроме того, приложение может быть построено поверх camel-archetype-java
.
Определения маршрутов Camel объявляются в методе RouteBuilder.configure
.
public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }
В этом определении мы создаем маршрут, который извлекает записи из файла JSON, разбивает их на элементы и направляет к набору обработчиков на основе содержимого сообщения.
Запустим его на подготовленных тестовых данных. Мы получим вывод:
INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert
Как и ожидалось, Camel перенаправлял сообщения адресатам.
Варианты передачи данных
В приведенном выше примере взаимодействие между компонентами синхронное и осуществляется через память приложения. Однако существует гораздо больше способов общения, когда мы имеем дело с отдельными приложениями, которые не используют общую память:
- Обмен файлами. Одно приложение создает файлы с общими данными для использования другим. Здесь живет дух старой школы. Этот метод связи имеет множество последствий: отсутствие транзакций и согласованности, низкая производительность и изолированная координация между системами. Многие разработчики в конечном итоге написали самодельные интеграционные решения, чтобы сделать процесс более или менее управляемым.
- Общая база данных. Пусть приложения хранят данные, которыми они хотят поделиться, в общей схеме единой базы данных. Разработка унифицированной схемы и обработка параллельного доступа к таблицам являются наиболее важными проблемами этого подхода. Как и в случае с обменом файлами, это может стать постоянным узким местом.
- Удаленный вызов API. Предоставьте интерфейс, позволяющий приложению взаимодействовать с другим работающим приложением, как при обычном вызове метода. Приложения разделяют функциональные возможности через вызовы API, но в процессе они тесно связаны.
- Обмен сообщениями. Пусть каждое приложение подключается к общей системе обмена сообщениями и обменивается данными и вызывает поведение асинхронно, используя сообщения. Ни отправитель, ни получатель не должны работать одновременно, чтобы сообщение было доставлено.
Есть больше способов взаимодействия, но мы должны помнить, что, вообще говоря, есть два типа взаимодействия: синхронное и асинхронное. Первый подобен вызову функции в вашем коде — поток выполнения будет ждать, пока она не выполнится и не вернет значение. При асинхронном подходе одни и те же данные отправляются через промежуточную очередь сообщений или тему подписки. Асинхронный удаленный вызов функции может быть реализован как EIP запрос-ответ.
Однако асинхронный обмен сообщениями — не панацея; это связано с определенными ограничениями. Вы редко видите API обмена сообщениями в Интернете; синхронные службы REST более популярны. Но промежуточное ПО для обмена сообщениями широко используется в интрасети предприятия или в серверной инфраструктуре распределенной системы.
Использование очередей сообщений
Давайте сделаем наш пример асинхронным. Программная система, управляющая очередями и темами подписки, называется брокером сообщений. Это как RDBMS для таблиц и столбцов. Очереди служат для двухточечной интеграции, в то время как темы предназначены для публикации-подписки со многими получателями. Мы будем использовать Apache ActiveMQ в качестве брокера сообщений JMS, потому что он надежный и встраиваемый.
Добавьте следующую зависимость. Иногда излишне добавлять в activemq-all
, который содержит все jar-файлы ActiveMQ, но мы не будем усложнять зависимости нашего приложения.
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>
Затем запустите брокер программно. В Spring Boot мы получаем автоконфигурацию для этого, подключая зависимость spring-boot-starter-activemq
Maven.
Запустите новый брокер сообщений со следующими командами, указав только конечную точку соединителя:
BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();
И добавьте следующий фрагмент configure
в тело метода configure:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));
Теперь мы можем обновить предыдущий пример, используя очереди сообщений. Очереди будут автоматически созданы при доставке сообщения.
public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }
Хорошо, теперь взаимодействие стало асинхронным. Потенциальные потребители этих данных могут получить к ним доступ, когда будут к этому готовы. Это пример слабой связи, которую мы пытаемся достичь в реактивной архитектуре. Недоступность одной из служб не блокирует другие. Более того, потребитель может масштабироваться и читать из очереди параллельно. Сама очередь может масштабироваться и быть разделена. Постоянные очереди могут хранить данные на диске в ожидании обработки, даже если все участники вышли из строя. Следовательно, эта система более отказоустойчива.

Удивительным фактом является то, что ЦЕРН использует Apache Camel и ActiveMQ для мониторинга систем Большого адронного коллайдера (БАК). Также есть интересная магистерская диссертация, объясняющая выбор подходящего промежуточного программного решения для этой задачи. Итак, как говорится в основном докладе: «Нет JMS — нет физики элементарных частиц!»
Мониторинг
В предыдущем примере мы создали канал данных между двумя сервисами. Это дополнительная потенциальная точка отказа в архитектуре, поэтому мы должны о ней позаботиться. Давайте посмотрим, какие функции мониторинга предоставляет Apache Camel. По сути, он предоставляет статистическую информацию о своих маршрутах через компоненты MBean, доступные через JMX. ActiveMQ предоставляет статистику очереди таким же образом.
Давайте включим сервер JMX в приложении, чтобы он мог работать с параметрами командной строки:
-Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel
Теперь запустите приложение, чтобы маршрут сделал свое дело. Откройте стандартный инструмент jconsole
и подключитесь к процессу приложения. Подключитесь к service:jmx:rmi:///jndi/rmi://localhost:1099/camel
. Перейдите в домен org.apache.camel в дереве MBeans.
Мы видим, что все, что касается маршрутизации, находится под контролем. У нас есть количество сообщений в пути, количество ошибок и количество сообщений в очередях. Эта информация может быть передана в какой-либо набор инструментов мониторинга с богатым функционалом, например Graphana или Kibana. Вы можете сделать это, внедрив известный стек ELK.
Существует также подключаемая и расширяемая веб-консоль, которая предоставляет пользовательский интерфейс для управления Camel, ActiveMQ и многими другими, называемая hawt.io.
Тестирование маршрутов
Apache Camel имеет достаточно широкий функционал для написания тестовых маршрутов с mock-компонентами. Это мощный инструмент, но написание отдельных маршрутов только для тестирования — трудоемкий процесс. Было бы более эффективно запускать тесты на производственных маршрутах, не изменяя их пайплайн. Верблюд имеет эту функцию и может быть реализован с помощью компонента AdviceWith.
Давайте включим тестовую логику в нашем примере и запустим пробный тест.
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>
Тестовый класс:
public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }
Теперь запустите тесты для приложения с помощью mvn test
. Мы видим, что наш маршрут успешно выполнен с советом по тестированию. Через фактические очереди не проходят никакие сообщения, и тесты пройдены.
INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied
Использование Apache Camel с кластером Kubernetes
Одна из проблем интеграции сегодня заключается в том, что приложения больше не статичны. В облачной инфраструктуре мы имеем дело с виртуальными сервисами, которые работают на нескольких узлах одновременно. Это позволяет использовать архитектуру микросервисов с сетью небольших, легковесных сервисов, взаимодействующих между собой. Срок службы этих сервисов ненадежен, и нам приходится обнаруживать их динамически.
Склеивание облачных сервисов — задача, которую можно решить с помощью Apache Camel. Это особенно интересно из-за варианта EIP и того факта, что Camel имеет множество адаптеров и поддерживает широкий спектр протоколов. В последней версии 2.18 добавлен компонент ServiceCall, который представляет функцию вызова API и разрешения его адреса с помощью механизмов обнаружения кластера. В настоящее время он поддерживает Consul, Kubernetes, Ribbon и т. д. Некоторые примеры кода, где ServiceCall настроен с помощью Consul, можно легко найти. Здесь мы будем использовать Kubernetes, потому что это мое любимое решение для кластеризации.
Схема интеграции будет следующей:
Служба Order
и служба Inventory
будут представлять собой пару тривиальных приложений Spring Boot, возвращающих статические данные. Здесь мы не привязаны к определенному стеку технологий. Эти сервисы производят данные, которые мы хотим обработать.
Заказать Сервисный контроллер:
@RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }
Он выдает данные в формате:
[{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]
Контроллер сервиса Inventory
абсолютно аналогичен сервису Order
:
@RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }
InventoryStorage
— это общий репозиторий, в котором хранятся данные. В этом примере он возвращает статические предварительно определенные объекты, которые маршалируются в следующем формате.
[{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]
Напишем маршрут шлюза, соединяющий их, но без ServiceCall на этом шаге:
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);
Теперь представьте, что каждый сервис больше не является отдельным экземпляром, а представляет собой облако экземпляров, работающих как единое целое. Мы будем использовать Minikube, чтобы попробовать кластер Kubernetes локально.
Настройте сетевые маршруты для локального просмотра узлов Kubernetes (данный пример относится к среде Mac/Linux):
# remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5
Оберните службы в контейнерах Docker с помощью конфигурации Dockerfile следующим образом:
FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar
Создавайте и отправляйте образы службы в реестр Docker. Теперь запустите узлы в локальном кластере Kubernetes.
Конфигурация развертывания Kubernetes.yaml:
apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081
Предоставьте эти развертывания как службы в кластере:
kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort
Теперь мы можем проверить, обслуживаются ли запросы случайно выбранными узлами из кластера. Запустите curl -X http://192.168.99.100:30517/info
несколько раз последовательно, чтобы получить доступ к minikube NodePort для открытой службы (используя ваш хост и порт). В выводе мы видим, что мы достигли балансировки запросов.
Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4
Добавьте camel-kubernetes
и camel-netty4-http
в pom.xml
проекта. Затем настройте компонент ServiceCall для использования обнаружения главного узла Kubernetes, общего для всех вызовов службы среди определений маршрута:
KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);
ServiceCall EIP хорошо дополняет Spring Boot. Большинство параметров можно настроить непосредственно в файле application.properties
.
Расширьте возможности маршрута Camel с помощью компонента ServiceCall:
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);
Мы также активировали автоматический выключатель в маршруте. Это интеграционный крючок, который позволяет приостанавливать удаленные системные вызовы в случае ошибок доставки или недоступности получателя. Это сделано для того, чтобы избежать каскадного отказа системы. Компонент Hystrix помогает достичь этого, реализуя шаблон прерывателя цепи.
Запустим его и отправим тестовый запрос; мы получим агрегированный ответ от обеих служб.
[{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]
Результат ожидаемый.
Другие варианты использования
Я показал, как Apache Camel может интегрировать микросервисы в кластер. Каковы другие варианты использования этой структуры? В общем, это полезно везде, где маршрутизация на основе правил может быть решением. For instance, Apache Camel can be a middleware for the Internet of Things with the Eclipse Kura adapter. It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.
Заключение
You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.
To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:
- Is there a separate integration layer?
- Are there tests for integration?
- Do we know the expected peak data intensity?
- Do we know the expected data delivery time?
- Does message correlation matter? What if a sequence breaks?
- Should we do it in a synchronous or asynchronous way?
- Where do formats and routing rules change more frequently?
- Do we have ways to monitor the process?
In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.
If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.