简化软件集成:Apache Camel 教程

已发表: 2022-03-11

软件很少(如果有的话)存在于信息真空中。 至少,这是我们软件工程师可以为我们开发的大多数应用程序做出的假设。

在任何规模上,每个软件——以一种或另一种方式——出于各种原因与其他一些软件进行通信:从某个地方获取参考数据、发送监控信号、在作为分布式系统的一部分时与其他服务联系系统等等。

在本教程中,您将了解集成大型软件的一些最大挑战是什么,以及 Apache Camel 如何轻松解决这些挑战。

问题:系统集成的架构设计

在您的软件工程生涯中,您可能至少做过一次以下事情:

  • 确定应该启动数据发送的业务逻辑片段。
  • 在同一个应用层中,根据接收者的期望编写数据转换。
  • 将数据包装在适合通过网络传输和路由的结构中。
  • 使用适当的驱动程序或客户端 SDK 打开与目标应用程序的连接。
  • 发送数据并处理响应。

为什么这是一个糟糕的行动方针?

虽然您只有少数此类连接,但它仍然可以管理。 随着系统之间的关系越来越多,应用程序的业务逻辑与集成逻辑混合在一起,集成逻辑是关于调整数据,补偿两个系统之间的技术差异,以及使用 SOAP、REST 或更奇特的请求将数据传输到外部系统.

如果您正在集成多个应用程序,那么在此类代码中追溯整个依赖关系将非常困难:数据在哪里产生以及哪些服务使用它? 您将在许多地方复制集成逻辑以进行引导。

使用这种方法,虽然任务在技术上已经完成,但我们最终会遇到集成的可维护性和可扩展性方面的巨大问题。 在这个系统中快速重组数据流几乎是不可能的,更不用说缺乏监控、断路、费力的数据恢复等更深层次的问题。

在相当大的企业范围内集成软件时,这一点尤其重要。 处理企业集成意味着使用一组应用程序,这些应用程序在广泛的平台上运行并存在于不同的位置。 这种软件环境中的数据交换要求很高。 它必须满足行业的高安全标准,并提供可靠的数据传输方式。 在企业环境中,系统集成需要单独的、详尽的架构设计。

本文将向您介绍软件集成面临的独特困难,并为集成任务提供一些经验驱动的解决方案。 我们将熟悉 Apache Camel,这是一个有用的框架,可以减轻集成开发人员最头疼的问题。 我们将通过一个示例来说明 Camel 如何帮助在由 Kubernetes 提供支持的微服务集群中建立通信。

整合困难

解决该问题的一种广泛使用的方法是解耦应用程序中的集成层。 它可以存在于同一个应用程序中,也可以作为独立运行的专用软件存在——在后一种情况下称为中间件。

在开发和支持中间件时,您通常会遇到哪些问题? 一般来说,您有以下关键项目:

  • 所有的数据通道在某种程度上都是不可靠的。 当数据强度从低到中等时,可能不会出现由这种不可靠性引起的问题。 从应用程序内存到较低的缓存和其下的设备的每个存储级别都可能发生故障。 一些罕见的错误只有在大量数据时才会出现。 即使是成熟的生产就绪供应商产品也存在未解决的与数据丢失相关的错误跟踪器问题。 中间件系统应该能够及时通知您这些数据伤亡并提供消息重新传递。
  • 应用程序使用不同的协议和数据格式。 这意味着集成系统是数据转换和适应其他参与者的帷幕,并利用了各种技术。 这些可以包括普通的 REST API 调用,但也可以是访问队列代理、通过 FTP 发送 CSV 订单或将数据批量拉入数据库表。 这是一个很长的清单,而且永远不会变短。
  • 数据格式和路由规则的变化是不可避免的。 应用程序开发过程中的每一步都会改变数据结构,通常会导致集成数据格式和转换的变化。 有时,需要通过重组的企业数据流来改变基础架构。 例如,当引入必须处理整个公司的所有主数据条目的单点验证参考数据时,可能会发生这些变化。 对于N个系统,我们最终可能会在它们之间拥有最多几乎N^2连接,因此必须应用更改的地方的数量增长得非常快。 会像雪崩一样。 为了维持可维护性,中间件层必须通过通用路由和数据转换提供清晰的依赖关系图。

在设计集成和选择最合适的中间件解决方案时应牢记这些想法。 处理它的一种可能方法是利用企业服务总线 (ESB)。 但是主要供应商提供的 ESB 通常过于繁重,而且往往比它们的价值更麻烦:几乎不可能快速开始使用 ESB,它的学习曲线相当陡峭,而且它的灵活性被牺牲了很多的功能和内置工具。 在我看来,轻量级开源集成解决方案要优越得多——它们更具弹性、易于部署到云中并且易于扩展。

软件集成并不容易。 今天,当我们构建微服务架构并处理大量小型服务时,我们对它们的通信效率也寄予厚望。

企业集成模式

正如所料,与一般的软件开发一样,数据路由和转换的开发涉及重复操作。 这方面的经验已经被处理集成问题相当长一段时间的专业人士总结和系统化。 结果,有一组提取的模板称为企业集成模式,用于设计数据流。 这些集成方法在 Gregor Hophe 和 Bobby Wolfe 的同名书中进行了描述,这本书很像四人帮的重要著作,但在胶合软件领域。

举个例子,规范器模式引入了一个组件,该组件将具有不同数据格式的语义相同的消息映射到单个规范模型,或者聚合器是将一系列消息组合成一个的 EIP。

由于它们是用于解决架构问题的已建立的与技术无关的抽象,因此 EIP 有助于编写架构设计,该设计不会深入研究代码级别,而是足够详细地描述数据流。 这种描述集成路径的符号不仅使设计简洁,而且设置了通用的命名法和通用的语言,这对于解决与来自不同业务领域的团队成员的集成任务非常重要。

介绍 Apache Camel

几年前,我正在一个庞大的杂货零售网络中构建企业集成,该网络的商店分布广泛。 我从一个专有的 ESB 解决方案开始,结果证明它维护起来过于麻烦。 然后,我们的团队遇到了 Apache Camel,在做了一些“概念验证”工作后,我们很快将我们所有的数据流重写为 Camel 路由。

Apache Camel 可以被描述为一个“中介路由器”,一个面向消息的中间件框架,实现了我熟悉的 EIP 列表。 它利用这些模式,支持所有常见的传输协议,并包含大量有用的适配器。 Camel 可以处理许多集成例程,而无需编写自己的代码。

除此之外,我会挑选出以下 Apache Camel 功能:

  • 集成路由被编写为由块组成的管道。 它创建了一个完全透明的图片来帮助追踪数据流。
  • Camel 为许多流行的 API 提供了适配器。 例如,从 Apache Kafka 获取数据、监控 AWS EC2 实例、与 Salesforce 集成——所有这些任务都可以使用开箱即用的组件来解决。

Apache Camel 路由可以用 Java 或 Scala DSL 编写。 (也可以使用 XML 配置,但会变得过于冗长且调试能力较差。)它不会对通信服务的技术堆栈施加限制,但如果您使用 Java 或 Scala 编写,则可以将 Camel 嵌入到应用程序中独立运行它。

Camel 使用的路由表示法可以用以下简单的伪代码来描述:

 from(Source) .transform(Transformer) .to(Destination)

SourceTransformerDestination是通过其 URI 引用实现组件的端点。

是什么使 Camel 能够解决我之前描述的集成问题? 我们来看一下。 首先,路由和转换逻辑现在只存在于专用的 Apache Camel 配置中。 其次,通过简洁自然的 DSL 结合 EIP 的使用,呈现出系统间依赖的图景。 它由可理解的抽象组成,并且路由逻辑很容易调整。 最后,我们不必编写大量的转换代码,因为可能已经包含了适当的适配器。

集成

我应该补充一点,Apache Camel 是一个成熟的框架,并且会定期更新。 它有一个很棒的社区和相当多的累积知识库。

它确实有自己的缺点。 Camel 不应该被视为一个复杂的集成套件。 它是一个工具箱,没有业务流程管理工具或活动监视器等高级功能,但它可用于创建此类软件。

例如,替代系统可能是 Spring Integration 或 Mule ESB。 对于 Spring Integration,虽然它被认为是轻量级的,但根据我的经验,将它放在一起并编写大量 XML 配置文件可能会变得出乎意料的复杂,而且很难解决。 Mule ESB 是一个健壮且功能强大的工具集,但顾名思义,它是一个企业服务总线,因此它属于不同的权重类别。 Mule 可以与 Fuse ESB 相提并论,后者是基于 Apache Camel 的类似产品,具有丰富的功能集。 对我来说,今天使用 Apache Camel 来粘合服务是一件轻而易举的事。 它易于使用,并且可以清晰地描述去往何处——同时,它的功能足以构建复杂的集成。

编写示例路线

让我们开始编写代码。 我们将从将消息从单个源路由到收件人列表的同步数据流开始。 路由规则将使用 Java DSL 编写。

我们将使用 Maven 来构建项目。 首先将以下依赖项添加到pom.xml

 <dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>

或者,应用程序可以构建在camel-archetype-java原型之上。

Camel 路由定义在RouteBuilder.configure方法中声明。

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }

在这个定义中,我们创建了一个路由,它从 JSON 文件中获取记录,将它们拆分为项目,并根据消息内容路由到一组处理程序。

让我们在准备好的测试数据上运行它。 我们将得到输出:

 INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert

正如预期的那样,Camel 将消息路由到目的地。

数据传输选择

在上面的示例中,组件之间的交互是同步的,并通过应用程序内存执行。 但是,当我们处理不共享内存的单独应用程序时,还有更多的通信方式:

  • 文件交换。 一个应用程序生成共享数据文件供另一个应用程序使用。 这是老派精神的所在。 这种通信方法有很多后果:缺乏事务和一致性、性能差以及系统之间的孤立协调。 许多开发人员最终编写了自制的集成解决方案,以使流程或多或少易于管理。
  • 通用数据库。 让应用程序将它们希望共享的数据存储在单个数据库的通用模式中。 设计统一模式和处理对表的并发访问是这种方法最突出的挑战。 与文件交换一样,这很容易成为永久性瓶颈。
  • 远程 API 调用。 提供一个接口以允许应用程序与另一个正在运行的应用程序进行交互,例如典型的方法调用。 应用程序通过 API 调用共享功能,但它在过程中将它们紧密耦合。
  • 消息传递。 让每个应用程序连接到一个公共消息传递系统,并使用消息异步交换数据和调用行为。 发送者和接收者都不必同时启动并运行才能传递消息。

交互的方式还有很多,但我们应该记住,从广义上讲,交互有两种类型:同步和异步。 第一个就像在代码中调用一个函数——执行流程将一直等待,直到它执行并返回一个值。 使用异步方法,相同的数据通过中间消息队列或订阅主题发送。 异步远程函数调用可以实现为请求-回复 EIP。

但是,异步消息传递并不是万能的。 它涉及某些限制。 您很少在 Web 上看到消息传递 API; 同步 REST 服务更受欢迎。 但消息中间件广泛用于企业内网或分布式系统后端基础设施。

使用消息队列

让我们的示例异步。 管理队列和订阅主题的软件系统称为消息代理。 它就像一个用于表和列的 RDBMS。 队列用作点对点集成,而主题用于与许多接收者进行发布-订阅通信。 我们将使用 Apache ActiveMQ 作为 JMS 消息代理,因为它可靠且可嵌入。

添加以下依赖项。 有时将包含所有 ActiveMQ jars 的activemq-all添加到项目中是多余的,但我们将保持应用程序的依赖关系不复杂。

 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>

然后以编程方式启动代理。 在 Spring Boot 中,我们通过插入spring-boot-starter-activemq Maven 依赖项来获得自动配置。

使用以下命令运行新的消息代理,仅指定连接器的端点:

 BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();

并将以下配置片段添加到configure方法主体:

 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

现在我们可以使用消息队列更新前面的示例。 队列将在消息传递时自动创建。

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }

好了,现在交互变成了异步的。 这些数据的潜在消费者可以在准备好时访问它。 这是一个松散耦合的例子,我们试图在反应式架构中实现。 其中一项服务的不可用不会阻止其他服务。 此外,消费者可以并行扩展队列并从队列中读取。 队列本身可以扩展和分区。 持久队列可以将数据存储在磁盘上,等待处理,即使所有参与者都挂了。 因此,该系统更具容错性。

一个惊人的事实是,CERN 使用 Apache Camel 和 ActiveMQ 来监控大型强子对撞机 (LHC) 的系统。 还有一篇有趣的硕士论文解释了为此任务选择合适的中间件解决方案。 所以,正如他们在主题演讲中所说,“没有 JMS——没有粒子物理学!”

监控

在前面的示例中,我们在两个服务之间创建了数据通道。 这是架构中一个额外的潜在故障点,所以我们必须照顾它。 下面我们来看看 Apache Camel 提供了哪些监控功能。 基本上,它通过 JMX 可访问的 MBean 公开有关其路由的统计信息。 ActiveMQ 以相同的方式公开队列统计信息。

让我们在应用程序中打开 JMX 服务器,使其能够使用命令行选项运行:

 -Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel

现在运行应用程序,这样路由就完成了它的工作。 打开标准的jconsole工具并连接到应用程序进程。 连接到 URL service:jmx:rmi:///jndi/rmi://localhost:1099/camel 。 转到 MBeans 树中的 org.apache.camel 域。

截图 1

我们可以看到,关于路由的一切都在掌控之中。 我们有运行中消息的数量、错误计数和队列中的消息计数。 这些信息可以通过管道传输到一些具有丰富功能的监控工具集,如 Graphana 或 Kibana。 您可以通过实现众所周知的 ELK 堆栈来做到这一点。

还有一个可插入和可扩展的 Web 控制台,它提供了一个用于管理 Camel、ActiveMQ 等的 UI,称为 hawt.io。

截图 2

测试路线

Apache Camel 具有相当广泛的功能,可以使用模拟组件编写测试路由。 它是一个强大的工具,但仅为测试编写单独的路由是一个耗时的过程。 在生产路线上运行测试而不修改其管道会更有效。 Camel 有这个特性,可以使用 AdviceWith 组件来实现。

让我们在示例中启用测试逻辑并运行示例测试。

 <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>

测试类是:

 public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }

现在使用mvn test为应用程序运行测试。 我们可以看到我们的路由已经成功执行了测试建议。 没有消息通过实际队列,并且测试已通过。

 INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied

将 Apache Camel 与 Kubernetes 集群一起使用

今天的集成问题之一是应用程序不再是静态的。 在云基础架构中,我们处理同时在多个节点上运行的虚拟服务。 它使微服务架构具有相互交互的小型、轻量级服务网络。 这些服务的生命周期不可靠,我们必须动态地发现它们。

将云服务粘合在一起是一项可以使用 Apache Camel 解决的任务。 这特别有趣,因为 EIP 风格以及 Camel 有大量适配器并支持广泛的协议这一事实。 最近的 2.18 版本添加了 ServiceCall 组件,该组件引入了调用 API 并通过集群发现机制解析其地址的功能。 目前支持 Consul、Kubernetes、Ribbon 等。ServiceCall 配置了 Consul 的一些代码示例很容易找到。 我们将在这里使用 Kubernetes,因为它是我最喜欢的集群解决方案。

集成架构将如下所示:

架构

Order服务和Inventory服务将是几个返回静态数据的简单 Spring Boot 应用程序。 我们在这里不受特定技术堆栈的约束。 这些服务正在生成我们想要处理的数据。

订单服务控制器:

 @RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }

它产生以下格式的数据:

 [{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]

Inventory服务控制器与Order服务的控制器完全相似:

 @RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }

InventoryStorage是一个保存数据的通用存储库。 在此示例中,它返回静态预定义对象,这些对象被编组为以下格式。

 [{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]

让我们编写一个连接它们的网关路由,但在这一步没有 ServiceCall:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);

现在想象一下,每个服务不再是一个特定的实例,而是一组作为一个实例运行的实例。 我们将使用 Minikube 在本地尝试 Kubernetes 集群。

配置网络路由以在本地查看 Kubernetes 节点(给定示例适用于 Mac/Linux 环境):

 # remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5

使用 Dockerfile 配置将服务包装在 Docker 容器中,如下所示:

 FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar

构建服务镜像并将其推送到 Docker 注册表。 现在运行本地 Kubernetes 集群中的节点。

Kubernetes.yaml 部署配置:

 apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081

将这些部署公开为集群中的服务:

 kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort

现在我们可以检查请求是否由集群中随机选择的节点提供服务。 依次运行curl -X http://192.168.99.100:30517/info几次以访问 minikube NodePort 以获取公开服务(使用您的主机和端口)。 在输出中,我们看到我们已经实现了请求平衡。

 Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4

camel-kubernetescamel-netty4-http依赖项添加到项目的pom.xml中。 然后配置 ServiceCall 组件以使用 Kubernetes 主节点发现共享路由定义中的所有服务调用:

 KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);

ServiceCall EIP 很好地补充了 Spring Boot。 大多数选项可以直接在application.properties文件中配置。

使用 ServiceCall 组件为 Camel 路由赋能:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);

我们还在路线中激活了断路器。 它是一个集成挂钩,允许在发生交付错误或收件人不可用的情况下暂停远程系统调用。 这是为了避免级联系统故障。 Hystrix 组件通过实现断路器模式来帮助实现这一点。

让我们运行它并发送一个测试请求; 我们将从这两个服务中获得汇总的响应。

 [{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]

结果符合预期。

其他用例

我展示了 Apache Camel 如何在集群中集成微服务。 这个框架还有什么其他用途? 一般来说,它在基于规则的路由可能是解决方案的任何地方都很有用。 例如,Apache Camel 可以是带有 Eclipse Kura 适配器的物联网中间件。 It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.

结论

You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.

To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:

  • Is there a separate integration layer?
  • Are there tests for integration?
  • Do we know the expected peak data intensity?
  • Do we know the expected data delivery time?
  • Does message correlation matter? What if a sequence breaks?
  • Should we do it in a synchronous or asynchronous way?
  • Where do formats and routing rules change more frequently?
  • Do we have ways to monitor the process?

In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.

If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.