簡化軟件集成:Apache Camel 教程
已發表: 2022-03-11軟件很少(如果有的話)存在於信息真空中。 至少,這是我們軟件工程師可以為我們開發的大多數應用程序做出的假設。
在任何規模上,每個軟件——以一種或另一種方式——出於各種原因與其他一些軟件進行通信:從某個地方獲取參考數據、發送監控信號、在作為分佈式系統的一部分時與其他服務聯繫系統等等。
在本教程中,您將了解集成大型軟件的一些最大挑戰是什麼,以及 Apache Camel 如何輕鬆解決這些挑戰。
問題:系統集成的架構設計
在您的軟件工程生涯中,您可能至少做過一次以下事情:
- 確定應該啟動數據發送的業務邏輯片段。
- 在同一個應用層中,根據接收者的期望編寫數據轉換。
- 將數據包裝在適合通過網絡傳輸和路由的結構中。
- 使用適當的驅動程序或客戶端 SDK 打開與目標應用程序的連接。
- 發送數據並處理響應。
為什麼這是一個糟糕的行動方針?
雖然您只有少數此類連接,但它仍然可以管理。 隨著系統之間的關係越來越多,應用程序的業務邏輯與集成邏輯混合在一起,集成邏輯是關於調整數據,補償兩個系統之間的技術差異,以及使用 SOAP、REST 或更奇特的請求將數據傳輸到外部系統.
如果您正在集成多個應用程序,那麼在此類代碼中追溯整個依賴關係將非常困難:數據在哪裡產生以及哪些服務使用它? 您將在許多地方復制集成邏輯以進行引導。
使用這種方法,雖然任務在技術上已經完成,但我們最終會遇到集成的可維護性和可擴展性方面的巨大問題。 在這個系統中快速重組數據流幾乎是不可能的,更不用說缺乏監控、斷路、費力的數據恢復等更深層次的問題。
在相當大的企業範圍內集成軟件時,這一點尤其重要。 處理企業集成意味著使用一組應用程序,這些應用程序在廣泛的平台上運行並存在於不同的位置。 這種軟件環境中的數據交換要求很高。 它必須滿足行業的高安全標準,並提供可靠的數據傳輸方式。 在企業環境中,系統集成需要單獨的、詳盡的架構設計。
本文將向您介紹軟件集成面臨的獨特困難,並為集成任務提供一些經驗驅動的解決方案。 我們將熟悉 Apache Camel,這是一個有用的框架,可以減輕集成開發人員最頭疼的問題。 我們將通過一個示例來說明 Camel 如何幫助在由 Kubernetes 提供支持的微服務集群中建立通信。
整合困難
解決該問題的一種廣泛使用的方法是解耦應用程序中的集成層。 它可以存在於同一個應用程序中,也可以作為獨立運行的專用軟件存在——在後一種情況下稱為中間件。
在開發和支持中間件時,您通常會遇到哪些問題? 一般來說,您有以下關鍵項目:
- 所有的數據通道在某種程度上都是不可靠的。 當數據強度從低到中等時,可能不會出現由這種不可靠性引起的問題。 從應用程序內存到較低的緩存和其下的設備的每個存儲級別都可能發生故障。 一些罕見的錯誤只有在大量數據時才會出現。 即使是成熟的生產就緒供應商產品也存在未解決的與數據丟失相關的錯誤跟踪器問題。 中間件系統應該能夠及時通知您這些數據傷亡並提供消息重新傳遞。
- 應用程序使用不同的協議和數據格式。 這意味著集成系統是數據轉換和適應其他參與者的帷幕,並利用了各種技術。 這些可以包括普通的 REST API 調用,但也可以是訪問隊列代理、通過 FTP 發送 CSV 訂單或將數據批量拉入數據庫表。 這是一個很長的清單,而且永遠不會變短。
- 數據格式和路由規則的變化是不可避免的。 應用程序開發過程中的每一步都會改變數據結構,通常會導致集成數據格式和轉換的變化。 有時,需要通過重組的企業數據流來改變基礎架構。 例如,當引入必須處理整個公司的所有主數據條目的單點驗證參考數據時,可能會發生這些變化。 對於
N
個系統,我們最終可能會在它們之間擁有最多幾乎N^2
連接,因此必須應用更改的地方的數量增長得非常快。 會像雪崩一樣。 為了維持可維護性,中間件層必須通過通用路由和數據轉換提供清晰的依賴關係圖。
在設計集成和選擇最合適的中間件解決方案時應牢記這些想法。 處理它的一種可能方法是利用企業服務總線 (ESB)。 但是主要供應商提供的 ESB 通常過於繁重,而且往往比它們的價值更麻煩:幾乎不可能快速開始使用 ESB,它的學習曲線相當陡峭,而且它的靈活性被犧牲了很多的功能和內置工具。 在我看來,輕量級開源集成解決方案要優越得多——它們更具彈性、易於部署到雲中並且易於擴展。
軟件集成並不容易。 今天,當我們構建微服務架構並處理大量小型服務時,我們對它們的通信效率也寄予厚望。
企業集成模式
正如所料,與一般的軟件開發一樣,數據路由和轉換的開發涉及重複操作。 這方面的經驗已經被處理集成問題相當長一段時間的專業人士總結和系統化。 結果,有一組提取的模板稱為企業集成模式,用於設計數據流。 這些集成方法在 Gregor Hophe 和 Bobby Wolfe 的同名書中進行了描述,這本書很像四人幫的重要著作,但在膠合軟件領域。
舉個例子,規範器模式引入了一個組件,該組件將具有不同數據格式的語義相同的消息映射到單個規範模型,或者聚合器是將一系列消息組合成一個的 EIP。
由於它們是用於解決架構問題的已建立的與技術無關的抽象,因此 EIP 有助於編寫架構設計,它不會深入研究代碼級別,而是足夠詳細地描述數據流。 這種描述集成路徑的符號不僅使設計簡潔,而且設置了通用的命名法和通用的語言,這對於解決與來自不同業務領域的團隊成員的集成任務非常重要。
介紹 Apache Camel
幾年前,我正在一個龐大的雜貨零售網絡中構建企業集成,該網絡的商店分佈廣泛。 我從一個專有的 ESB 解決方案開始,結果證明它維護起來過於麻煩。 然後,我們的團隊遇到了 Apache Camel,在做了一些“概念驗證”工作後,我們很快將我們所有的數據流重寫為 Camel 路由。
Apache Camel 可以被描述為一個“中介路由器”,一個面向消息的中間件框架,實現了我熟悉的 EIP 列表。 它利用這些模式,支持所有常見的傳輸協議,並包含大量有用的適配器。 Camel 可以處理許多集成例程,而無需編寫自己的代碼。
除此之外,我會挑選出以下 Apache Camel 功能:
- 集成路由被編寫為由塊組成的管道。 它創建了一個完全透明的圖片來幫助追踪數據流。
- Camel 為許多流行的 API 提供了適配器。 例如,從 Apache Kafka 獲取數據、監控 AWS EC2 實例、與 Salesforce 集成——所有這些任務都可以使用開箱即用的組件來解決。
Apache Camel 路由可以用 Java 或 Scala DSL 編寫。 (也可以使用 XML 配置,但會變得過於冗長且調試能力較差。)它不會對通信服務的技術堆棧施加限制,但如果您使用 Java 或 Scala 編寫,則可以將 Camel 嵌入到應用程序中獨立運行它。
Camel 使用的路由表示法可以用以下簡單的偽代碼來描述:
from(Source) .transform(Transformer) .to(Destination)
Source
、 Transformer
和Destination
是通過其 URI 引用實現組件的端點。
是什麼使 Camel 能夠解決我之前描述的集成問題? 我們來看一下。 首先,路由和轉換邏輯現在只存在於專用的 Apache Camel 配置中。 其次,通過簡潔自然的 DSL 結合 EIP 的使用,呈現出系統間依賴的圖景。 它由可理解的抽象組成,並且路由邏輯很容易調整。 最後,我們不必編寫大量的轉換代碼,因為可能已經包含了適當的適配器。
我應該補充一點,Apache Camel 是一個成熟的框架,並且會定期更新。 它有一個很棒的社區和相當多的累積知識庫。
它確實有自己的缺點。 Camel 不應該被視為一個複雜的集成套件。 它是一個工具箱,沒有業務流程管理工具或活動監視器等高級功能,但它可用於創建此類軟件。
例如,替代系統可能是 Spring Integration 或 Mule ESB。 對於 Spring Integration,儘管它被認為是輕量級的,但根據我的經驗,將它放在一起並編寫大量 XML 配置文件可能會變得異常複雜,而且很難解決。 Mule ESB 是一個健壯且功能強大的工具集,但顧名思義,它是一個企業服務總線,因此它屬於不同的權重類別。 Mule 可以與 Fuse ESB 相提並論,後者是基於 Apache Camel 的類似產品,具有豐富的功能集。 對我來說,今天使用 Apache Camel 來粘合服務是一件輕而易舉的事。 它易於使用,並且可以清晰地描述去往何處——同時,它的功能足以構建複雜的集成。
編寫示例路線
讓我們開始編寫代碼。 我們將從將消息從單個源路由到收件人列表的同步數據流開始。 路由規則將使用 Java DSL 編寫。
我們將使用 Maven 來構建項目。 首先將以下依賴項添加到pom.xml
:
<dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>
或者,應用程序可以構建在camel-archetype-java
原型之上。
Camel 路由定義在RouteBuilder.configure
方法中聲明。
public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }
在這個定義中,我們創建了一個路由,它從 JSON 文件中獲取記錄,將它們拆分為項目,並根據消息內容路由到一組處理程序。
讓我們在準備好的測試數據上運行它。 我們將得到輸出:
INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert
正如預期的那樣,Camel 將消息路由到目的地。
數據傳輸選擇
在上面的示例中,組件之間的交互是同步的,並通過應用程序內存執行。 但是,當我們處理不共享內存的單獨應用程序時,還有更多的通信方式:
- 文件交換。 一個應用程序生成共享數據文件供另一個應用程序使用。 這是老派精神的所在。 這種通信方法有很多後果:缺乏事務和一致性、性能差以及系統之間的孤立協調。 許多開發人員最終編寫了自製的集成解決方案,以使流程或多或少易於管理。
- 通用數據庫。 讓應用程序將它們希望共享的數據存儲在單個數據庫的通用模式中。 設計統一模式和處理對錶的並發訪問是這種方法最突出的挑戰。 與文件交換一樣,這很容易成為永久性瓶頸。
- 遠程 API 調用。 提供一個接口以允許應用程序與另一個正在運行的應用程序進行交互,例如典型的方法調用。 應用程序通過 API 調用共享功能,但它在過程中將它們緊密耦合。
- 消息傳遞。 讓每個應用程序連接到一個公共消息傳遞系統,並使用消息異步交換數據和調用行為。 發送者和接收者都不必同時啟動並運行才能傳遞消息。
交互的方式還有很多,但我們應該記住,從廣義上講,交互有兩種類型:同步和異步。 第一個就像在代碼中調用一個函數——執行流程將一直等待,直到它執行並返回一個值。 使用異步方法,相同的數據通過中間消息隊列或訂閱主題發送。 異步遠程函數調用可以實現為請求-回复 EIP。
但是,異步消息傳遞並不是萬能的。 它涉及某些限制。 您很少在 Web 上看到消息傳遞 API; 同步 REST 服務更受歡迎。 但消息中間件廣泛用於企業內網或分佈式系統後端基礎設施。
使用消息隊列
讓我們的示例異步。 管理隊列和訂閱主題的軟件系統稱為消息代理。 它就像一個用於表和列的 RDBMS。 隊列用作點對點集成,而主題用於與許多接收者進行發布-訂閱通信。 我們將使用 Apache ActiveMQ 作為 JMS 消息代理,因為它可靠且可嵌入。
添加以下依賴項。 有時將包含所有 ActiveMQ jars 的activemq-all
添加到項目中是多餘的,但我們將保持應用程序的依賴關係不復雜。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>
然後以編程方式啟動代理。 在 Spring Boot 中,我們通過插入spring-boot-starter-activemq
Maven 依賴項來獲得自動配置。
使用以下命令運行新的消息代理,僅指定連接器的端點:
BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();
並將以下配置片段添加到configure
方法主體:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));
現在我們可以使用消息隊列更新前面的示例。 隊列將在消息傳遞時自動創建。
public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }
好了,現在交互變成了異步的。 這些數據的潛在消費者可以在準備好時訪問它。 這是一個鬆散耦合的例子,我們試圖在反應式架構中實現。 其中一項服務的不可用不會阻止其他服務。 此外,消費者可以並行擴展隊列並從隊列中讀取。 隊列本身可以擴展和分區。 持久隊列可以將數據存儲在磁盤上,等待處理,即使所有參與者都掛了。 因此,該系統更具容錯性。
一個驚人的事實是,CERN 使用 Apache Camel 和 ActiveMQ 來監控大型強子對撞機 (LHC) 的系統。 還有一篇有趣的碩士論文解釋了為此任務選擇合適的中間件解決方案。 所以,正如他們在主題演講中所說,“沒有 JMS——沒有粒子物理學!”
監控
在前面的示例中,我們在兩個服務之間創建了數據通道。 這是架構中一個額外的潛在故障點,所以我們必須照顧它。 下面我們來看看 Apache Camel 提供了哪些監控功能。 基本上,它通過 JMX 可訪問的 MBean 公開有關其路由的統計信息。 ActiveMQ 以相同的方式公開隊列統計信息。
讓我們在應用程序中打開 JMX 服務器,使其能夠使用命令行選項運行:
-Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel
現在運行應用程序,這樣路由就完成了它的工作。 打開標準的jconsole
工具並連接到應用程序進程。 連接到 URL service:jmx:rmi:///jndi/rmi://localhost:1099/camel
。 轉到 MBeans 樹中的 org.apache.camel 域。
我們可以看到,關於路由的一切都在掌控之中。 我們有運行中消息的數量、錯誤計數和隊列中的消息計數。 這些信息可以通過管道傳輸到一些具有豐富功能的監控工具集,如 Graphana 或 Kibana。 您可以通過實現眾所周知的 ELK 堆棧來做到這一點。
還有一個可插入和可擴展的 Web 控制台,它提供了一個用於管理 Camel、ActiveMQ 等的 UI,稱為 hawt.io。
測試路線
Apache Camel 具有相當廣泛的功能,可以使用模擬組件編寫測試路由。 它是一個強大的工具,但僅為測試編寫單獨的路由是一個耗時的過程。 在生產路線上運行測試而不修改其管道會更有效。 Camel 有這個特性,可以使用 AdviceWith 組件來實現。
讓我們在示例中啟用測試邏輯並運行示例測試。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>
測試類是:
public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }
現在使用mvn test
為應用程序運行測試。 我們可以看到我們的路由已經成功執行了測試建議。 沒有消息通過實際隊列,並且測試已通過。

INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied
將 Apache Camel 與 Kubernetes 集群一起使用
今天的集成問題之一是應用程序不再是靜態的。 在雲基礎架構中,我們處理同時在多個節點上運行的虛擬服務。 它使微服務架構具有相互交互的小型、輕量級服務網絡。 這些服務的生命週期不可靠,我們必須動態地發現它們。
將雲服務粘合在一起是一項可以使用 Apache Camel 解決的任務。 這特別有趣,因為 EIP 風格以及 Camel 有大量適配器並支持廣泛的協議這一事實。 最近的 2.18 版本添加了 ServiceCall 組件,該組件引入了調用 API 並通過集群發現機制解析其地址的功能。 目前支持 Consul、Kubernetes、Ribbon 等。ServiceCall 配置了 Consul 的一些代碼示例很容易找到。 我們將在這裡使用 Kubernetes,因為它是我最喜歡的集群解決方案。
集成架構將如下所示:
Order
服務和Inventory
服務將是幾個返回靜態數據的簡單 Spring Boot 應用程序。 我們在這裡不受特定技術堆棧的約束。 這些服務正在生成我們想要處理的數據。
訂單服務控制器:
@RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }
它產生以下格式的數據:
[{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]
Inventory
服務控制器與Order
服務的控制器完全相似:
@RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }
InventoryStorage
是一個保存數據的通用存儲庫。 在此示例中,它返回靜態預定義對象,這些對像被編組為以下格式。
[{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]
讓我們編寫一個連接它們的網關路由,但在這一步沒有 ServiceCall:
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);
現在想像一下,每個服務不再是一個特定的實例,而是一組作為一個實例運行的實例。 我們將使用 Minikube 在本地嘗試 Kubernetes 集群。
配置網絡路由以在本地查看 Kubernetes 節點(給定示例適用於 Mac/Linux 環境):
# remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5
使用 Dockerfile 配置將服務包裝在 Docker 容器中,如下所示:
FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar
構建服務鏡像並將其推送到 Docker 註冊表。 現在運行本地 Kubernetes 集群中的節點。
Kubernetes.yaml 部署配置:
apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081
將這些部署公開為集群中的服務:
kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort
現在我們可以檢查請求是否由集群中隨機選擇的節點提供服務。 依次運行curl -X http://192.168.99.100:30517/info
幾次以訪問 minikube NodePort 以獲取公開服務(使用您的主機和端口)。 在輸出中,我們看到我們已經實現了請求平衡。
Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4
將camel-kubernetes
和camel-netty4-http
依賴項添加到項目的pom.xml
中。 然後配置 ServiceCall 組件以使用 Kubernetes 主節點發現共享路由定義中的所有服務調用:
KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);
ServiceCall EIP 很好地補充了 Spring Boot。 大多數選項可以直接在application.properties
文件中配置。
使用 ServiceCall 組件為 Camel 路由賦能:
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);
我們還在路線中激活了斷路器。 它是一個集成掛鉤,允許在發生交付錯誤或收件人不可用的情況下暫停遠程系統調用。 這是為了避免級聯繫統故障。 Hystrix 組件通過實現斷路器模式來幫助實現這一點。
讓我們運行它並發送一個測試請求; 我們將從這兩個服務中獲得匯總的響應。
[{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]
結果符合預期。
其他用例
我展示了 Apache Camel 如何在集群中集成微服務。 這個框架還有什麼其他用途? 一般來說,它在基於規則的路由可能是解決方案的任何地方都很有用。 例如,Apache Camel 可以是帶有 Eclipse Kura 適配器的物聯網中間件。 It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.
結論
You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.
To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:
- Is there a separate integration layer?
- Are there tests for integration?
- Do we know the expected peak data intensity?
- Do we know the expected data delivery time?
- Does message correlation matter? What if a sequence breaks?
- Should we do it in a synchronous or asynchronous way?
- Where do formats and routing rules change more frequently?
- Do we have ways to monitor the process?
In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.
If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.