Optimizați integrarea software-ului: un tutorial Apache Camel

Publicat: 2022-03-11

Software-ul rareori, dacă este deloc, există într-un vid informațional. Cel puțin, aceasta este presupunerea pe care noi inginerii de software o putem face pentru majoritatea aplicațiilor pe care le dezvoltăm.

La orice scară, fiecare program – într-un fel sau altul – comunică cu un alt software din diverse motive: pentru a obține date de referință de undeva, pentru a trimite semnale de monitorizare, pentru a fi în contact cu alte servicii în timp ce face parte dintr-un program distribuit. sistem și multe altele.

În acest tutorial, veți afla care sunt unele dintre cele mai mari provocări ale integrării software-ului mare și cum le rezolvă cu ușurință Apache Camel.

Problema: Proiectarea arhitecturii pentru integrarea sistemelor

Este posibil să fi făcut următoarele cel puțin o dată în viața de inginerie software:

  • Identificați un fragment din logica dvs. de afaceri care ar trebui să inițieze trimiterea datelor.
  • În același strat de aplicație, scrieți transformările datelor în conformitate cu ceea ce se așteaptă destinatarul.
  • Înveliți datele într-o structură adecvată pentru transferul și rutarea printr-o rețea.
  • Deschideți o conexiune la o aplicație țintă folosind un driver adecvat sau un SDK client.
  • Trimiteți datele și gestionați răspunsul.

De ce este aceasta o linie de acțiune proastă?

Deși aveți doar câteva conexiuni de acest fel, rămâne ușor de gestionat. Cu un număr tot mai mare de relații între sisteme, logica de afaceri a aplicației se amestecă cu logica de integrare, care se referă la adaptarea datelor, compensarea diferențelor tehnologice dintre două sisteme și transferul de date către sistemul extern cu solicitări SOAP, REST sau mai exotice. .

Dacă ai integra mai multe aplicații, ar fi incredibil de dificil să retragi întreaga imagine a dependențelor dintr-un astfel de cod: Unde sunt produse datele și ce servicii le consumă? Veți avea multe locuri în care logica de integrare este duplicată, pentru a porni.

Cu o astfel de abordare, deși sarcina este îndeplinită din punct de vedere tehnic, ne confruntăm cu probleme uriașe cu mentenabilitatea și scalabilitatea integrării. Reorganizarea rapidă a fluxurilor de date în acest sistem este aproape imposibilă, ca să nu mai vorbim de probleme mai profunde precum lipsa monitorizării, întreruperea circuitului, recuperarea laborioasă a datelor etc.

Toate acestea sunt deosebit de importante atunci când se integrează software în domeniul unei întreprinderi considerabil mari. A face față integrării întreprinderii înseamnă a lucra cu un set de aplicații, care operează pe o gamă largă de platforme și există în locații diferite. Schimbul de date într-un astfel de peisaj software este destul de solicitant. Trebuie să îndeplinească standardele de înaltă securitate ale industriei și să ofere o modalitate fiabilă de a transfera date. Într-un mediu de întreprindere, integrarea sistemelor necesită o arhitectură separată, bine elaborată.

Acest articol vă va prezenta dificultățile unice cu care se confruntă integrarea software-ului și vă va oferi câteva soluții bazate pe experiență pentru sarcinile de integrare. Ne vom familiariza cu Apache Camel, un cadru util care poate atenua cele mai rele părți ale durerii de cap a dezvoltatorului de integrare. Vom urma cu un exemplu despre modul în care Camel poate ajuta la stabilirea comunicării într-un cluster de microservicii alimentate de Kubernetes.

Dificultăți de integrare

O abordare utilizată pe scară largă pentru rezolvarea problemei este decuplarea unui strat de integrare în aplicația dvs. Poate exista în cadrul aceleiași aplicații sau ca o bucată de software dedicată care rulează independent - în ultimul caz numit middleware.

Cu ce ​​probleme vă confruntați de obicei când dezvoltați și susțineți middleware? În general, aveți următoarele elemente cheie:

  • Toate canalele de date sunt nesigure într-o oarecare măsură. Problemele care decurg din această lipsă de încredere pot să nu apară în timp ce intensitatea datelor este scăzută spre moderată. Fiecare nivel de stocare de la memoria aplicației până la cache-urile și echipamentele de sub acesta este supus unei potențiale defecțiuni. Unele erori rare apar numai cu volume uriașe de date. Chiar și produsele mature ale furnizorilor gata de producție au probleme nerezolvate de urmărire a erorilor legate de pierderea de date. Un sistem middleware ar trebui să vă poată informa cu privire la aceste pierderi de date și să furnizeze reluarea mesajelor în timp util.
  • Aplicațiile folosesc diferite protocoale și formate de date. Aceasta înseamnă că un sistem de integrare este o cortină pentru transformările de date și adaptoarele pentru alți participanți și utilizează o varietate de tehnologii. Acestea pot include apeluri simple API REST, dar ar putea fi, de asemenea, accesarea unui broker de coadă, trimiterea de comenzi CSV prin FTP sau extragerea în loturi de date într-un tabel de bază de date. Aceasta este o listă lungă și nu se va scurta niciodată.
  • Schimbările în formatele de date și regulile de rutare sunt inevitabile. Fiecare pas din procesul de dezvoltare a unei aplicații, care modifică structura datelor, duce de obicei la modificări ale formatelor și transformărilor datelor de integrare. Uneori, sunt necesare modificări ale infrastructurii cu fluxuri de date reorganizate ale întreprinderii. De exemplu, aceste modificări se pot întâmpla atunci când se introduce un singur punct de validare a datelor de referință care trebuie să proceseze toate intrările de date de bază în întreaga companie. Cu N sisteme, putem ajunge să avem maxim aproape N^2 conexiuni între ele, astfel încât numărul locurilor în care trebuie aplicate modificări crește destul de repede. Va fi ca o avalanșă. Pentru a susține mentenabilitatea, un strat middleware trebuie să ofere o imagine clară a dependențelor cu rutare versatilă și transformare a datelor.

Aceste idei trebuie avute în vedere atunci când proiectați integrarea și alegeți cea mai potrivită soluție middleware. Una dintre modalitățile posibile de a le gestiona este utilizarea unei magistrale de servicii enterprise (ESB). Dar ESB-urile furnizate de furnizorii importanți sunt în general prea grele și sunt adesea mai multe probleme decât merită: este aproape imposibil să porniți rapid cu un ESB, are o curbă de învățare destul de abruptă și flexibilitatea sa este sacrificată unei liste lungi. de caracteristici și instrumente încorporate. În opinia mea, soluțiile ușoare de integrare open-source sunt cu mult superioare – sunt mai elastice, ușor de implementat în cloud și ușor de scalat.

Integrarea software-ului nu este ușor de făcut. Astăzi, pe măsură ce construim arhitecturi de microservicii și ne confruntăm cu roiuri de servicii mici, avem și așteptări mari pentru cât de eficient ar trebui să comunice.

Modele de integrare a întreprinderii

După cum era de așteptat, ca și dezvoltarea de software în general, dezvoltarea rutării și transformării datelor implică operații repetitive. Experiența în acest domeniu a fost rezumată și sistematizată de profesioniști care se ocupă de problemele de integrare de ceva timp. În rezultat, există un set de șabloane extrase numite modele de integrare întreprindere utilizate pentru proiectarea fluxurilor de date. Aceste metode de integrare au fost descrise în cartea cu același nume de Gregor Hophe și Bobby Wolfe, care seamănă mult cu cartea semnificativă a lui Gang of Four, dar în domeniul software-ului de lipire.

Pentru a da un exemplu, modelul de normalizator introduce o componentă care mapează mesaje egale din punct de vedere semantic care au formate de date diferite la un singur model canonic, sau agregatorul este un EIP care combină o secvență de mesaje într-unul singur.

Deoarece sunt abstracții agnostice de tehnologie stabilite utilizate pentru rezolvarea problemelor arhitecturale, EIP-urile ajută la scrierea unui design de arhitectură, care nu se adâncește la nivelul codului, ci descrie fluxurile de date în detaliu suficient. O astfel de notație pentru descrierea rutelor de integrare nu numai că face proiectarea concisă, ci și stabilește o nomenclatură comună și un limbaj comun, care sunt foarte importante în contextul rezolvării unei sarcini de integrare cu membrii echipei din diverse domenii de afaceri.

Vă prezentăm Apache Camel

Cu câțiva ani în urmă, construiam o integrare întreprinzătoare într-o rețea uriașă de magazine alimentare cu magazine în locații larg distribuite. Am început cu o soluție proprietară ESB, care s-a dovedit a fi prea greoaie de întreținut. Apoi, echipa noastră a dat peste Apache Camel și, după ce am făcut niște lucrări „dovada conceptului”, am rescris rapid toate fluxurile noastre de date în rutele Camel.

Apache Camel poate fi descris ca un „router de mediere”, un cadru middleware orientat pe mesaje care implementează lista de EIP-uri, cu care m-am familiarizat. Folosește aceste modele, acceptă toate protocoalele de transport comune și are un set vast de adaptoare utile incluse. Camel permite gestionarea unui număr de rutine de integrare fără a fi nevoie să scrieți propriul cod.

În afară de aceasta, aș evidenția următoarele caracteristici Apache Camel:

  • Rutele de integrare sunt scrise ca conducte formate din blocuri. Acesta creează o imagine total transparentă pentru a ajuta la urmărirea fluxurilor de date.
  • Camel are adaptoare pentru multe API-uri populare. De exemplu, obținerea de date de la Apache Kafka, monitorizarea instanțelor AWS EC2, integrarea cu Salesforce — toate aceste sarcini pot fi rezolvate folosind componente disponibile imediat.

Rutele Apache Camel pot fi scrise în Java sau Scala DSL. (Este disponibilă și o configurație XML, dar devine prea verbosă și are capacități de depanare mai proaste.) Nu impune restricții asupra stivei tehnologice a serviciilor de comunicare, dar dacă scrieți în Java sau Scala, puteți încorpora în schimb Camel într-o aplicație. de a-l rula independent.

Notația de rutare folosită de Camel poate fi descrisă cu următorul pseudocod simplu:

 from(Source) .transform(Transformer) .to(Destination)

Source , Transformer și Destination sunt puncte finale care se referă la componentele de implementare prin URI-urile lor.

Ce îi permite lui Camel să rezolve problemele de integrare descrise anterior? Hai să aruncăm o privire. În primul rând, logica de rutare și transformare trăiește acum doar într-o configurație dedicată Apache Camel. În al doilea rând, prin DSL-ul succint și natural, împreună cu utilizarea EIP-urilor, apare o imagine a dependențelor dintre sisteme. Este făcut din abstracții inteligibile, iar logica de rutare este ușor de reglat. Și, în sfârșit, nu trebuie să scriem grămezi de cod de transformare, deoarece adaptoarele adecvate sunt probabil incluse deja.

Integrari

Ar trebui să adaug că Apache Camel este un cadru matur și primește actualizări regulate. Are o comunitate grozavă și o bază de cunoștințe cumulativă considerabilă.

Are propriile sale dezavantaje. Camel nu ar trebui luată ca o suită complexă de integrare. Este o cutie de instrumente fără funcții de nivel înalt, cum ar fi instrumente de management al proceselor de afaceri sau monitoare de activitate, dar poate fi folosită pentru a crea un astfel de software.

Sistemele alternative ar putea fi, de exemplu, Spring Integration sau Mule ESB. Pentru Spring Integration, deși este considerat a fi ușor, din experiența mea, asamblarea și scrierea multor fișiere de configurare XML se poate dovedi a fi neașteptat de complicată și nu este o cale de ieșire ușoară. Mule ESB este un set de instrumente robust și foarte funcțional, dar, după cum sugerează și numele, este un autobuz de servicii pentru întreprinderi, deci aparține unei categorii de greutate diferite. Mule poate fi comparat cu Fuse ESB, un produs similar bazat pe Apache Camel cu un set bogat de caracteristici. Pentru mine, folosirea Apache Camel pentru serviciile de lipire este o idee deloc azi. Este ușor de utilizat și produce o descriere clară a ceea ce se întâmplă unde - în același timp, este suficient de funcțional pentru a construi integrări complexe.

Scrierea unui exemplu de traseu

Să începem să scriem codul. Vom începe de la un flux de date sincron care direcționează mesajele dintr-o singură sursă către o listă de destinatari. Regulile de rutare vor fi scrise în Java DSL.

Vom folosi Maven pentru a construi proiectul. Mai întâi adăugați următoarea dependență la pom.xml :

 <dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>

Alternativ, aplicația poate fi construită pe deasupra camel-archetype-java .

Definițiile rutei pentru cămilă sunt declarate în metoda RouteBuilder.configure .

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }

În această definiție, creăm o rută care preia înregistrările din fișierul JSON, le împarte în elemente și le direcționează către un set de handlere bazate pe conținutul mesajului.

Să-l rulăm pe datele de testare pregătite. Vom obține rezultatul:

 INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert

După cum era de așteptat, Camel a direcționat mesajele către destinații.

Opțiuni de transfer de date

În exemplul de mai sus, interacțiunea dintre componente este sincronă și se realizează prin memoria aplicației. Cu toate acestea, există mai multe moduri de a comunica atunci când avem de-a face cu aplicații separate care nu partajează memoria:

  • Schimb de fișiere. O aplicație produce fișiere de date partajate pentru ca cealaltă să le consume. Acolo trăiește spiritul vechii școli. Această metodă de comunicare are o multitudine de consecințe: lipsă de tranzacții și consecvență, performanță slabă și coordonare izolată între sisteme. Mulți dezvoltatori au ajuns să scrie soluții de integrare de casă pentru a face procesul mai mult sau mai puțin ușor de gestionat.
  • Baza de date comună. Cereți aplicațiilor să stocheze datele pe care doresc să le partajeze într-o schemă comună a unei singure baze de date. Proiectarea unei scheme unificate și gestionarea accesului concurent la tabele sunt cele mai importante provocări ale acestei abordări. Ca și în cazul schimbului de fișiere, este ușor ca acesta să devină un blocaj permanent.
  • Apel API la distanță. Furnizați o interfață pentru a permite unei aplicații să interacționeze cu o altă aplicație care rulează, cum ar fi un apel tipic de metodă. Aplicațiile partajează funcționalități prin invocări API, dar le cuplează strâns în acest proces.
  • Mesaje. Puneți fiecare aplicație să se conecteze la un sistem de mesagerie comun și să facă schimb de date și să invoce comportamentul asincron folosind mesaje. Nici expeditorul, nici destinatarul nu trebuie să fie în funcțiune în același timp pentru a primi mesajul.

Există mai multe moduri de a interacționa, dar ar trebui să ținem cont de faptul că, în linii mari, există două tipuri de interacțiune: sincronă și asincronă. Primul este ca și cum ați apela o funcție din codul dvs. - fluxul de execuție va aștepta până când se execută și returnează o valoare. Cu o abordare asincronă, aceleași date sunt trimise printr-o coadă intermediară de mesaje sau printr-un subiect de abonament. Un apel asincron al funcției de la distanță poate fi implementat ca EIP cerere-răspuns.

Mesageria asincronă nu este totuși un panaceu; implică anumite restricții. Rareori vezi API-uri de mesagerie pe web; Serviciile REST sincrone sunt mult mai populare. Dar middleware-ul de mesagerie este utilizat pe scară largă în intranetul întreprinderii sau în infrastructura back-end a sistemului distribuit.

Utilizarea cozilor de mesaje

Să facem exemplul nostru asincron. Un sistem software care gestionează cozile și subiectele de abonament se numește broker de mesaje. Este ca un RDBMS pentru tabele și coloane. Cozile servesc ca integrare punct la punct, în timp ce subiectele sunt pentru comunicarea de publicare-abonare cu mulți destinatari. Vom folosi Apache ActiveMQ ca broker de mesaje JMS, deoarece este solid și încorporabil.

Adăugați următoarea dependență. Uneori este excesiv să adăugați activemq-all , care conține toate jarurile ActiveMQ, la proiect, dar vom păstra dependențele aplicației noastre necomplicate.

 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>

Apoi porniți brokerul în mod programatic. În Spring Boot, obținem o configurare automată pentru aceasta prin conectarea dependenței spring-boot-starter-activemq Maven.

Rulați un nou broker de mesaje cu următoarele comenzi, specificând doar punctul final al conectorului:

 BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();

Și adăugați următorul fragment de configure în corpul metodei de configurare:

 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

Acum putem actualiza exemplul anterior folosind cozi de mesaje. Cozile vor fi create automat la livrarea mesajelor.

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }

Bine, acum interacțiunea a devenit asincronă. Consumatorii potențiali ai acestor date le pot accesa atunci când sunt pregătiți. Acesta este un exemplu de cuplare liberă, pe care încercăm să-l realizăm într-o arhitectură reactivă. Indisponibilitatea unuia dintre servicii nu le va bloca pe celelalte. Mai mult, un consumator poate scala și citi din coadă în paralel. Coada în sine se poate scala și poate fi partiționată. Cozile persistente pot stoca datele pe disc, așteaptă să fie procesate, chiar și atunci când toți participanții au căzut. În consecință, acest sistem este mai tolerant la erori.

Un fapt uimitor este că CERN folosește Apache Camel și ActiveMQ pentru a monitoriza sistemele Large Hadron Collider (LHC). Există, de asemenea, o teză de master interesantă care explică alegerea unei soluții middleware adecvate pentru această sarcină. Deci, așa cum se spune în discursul principal, „Fără JMS, fără fizica particulelor!”

Monitorizarea

În exemplul anterior, am creat canalul de date între două servicii. Este un punct de eșec potențial suplimentar într-o arhitectură, așa că trebuie să avem grijă de el. Să aruncăm o privire la ce funcții de monitorizare oferă Apache Camel. Practic, expune informații statistice despre rutele sale prin MBeans, accesibile de JMX. ActiveMQ expune statisticile cozii în același mod.

Să pornim serverul JMX în aplicație, pentru a-l permite să ruleze cu opțiunile din linia de comandă:

 -Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel

Acum rulați aplicația, astfel încât ruta să-și facă treaba. Deschideți instrumentul standard jconsole și conectați-vă la procesul de aplicare. Conectați-vă la service:jmx:rmi:///jndi/rmi://localhost:1099/camel . Accesați domeniul org.apache.camel din arborele MBeans.

Captură de ecran 1

Putem vedea că totul despre rutare este sub control. Avem numărul de mesaje în timpul zborului, numărul de erori și numărul de mesaje în cozi. Aceste informații pot fi transmise la un set de instrumente de monitorizare cu funcționalități bogate, cum ar fi Graphana sau Kibana. Puteți face acest lucru prin implementarea binecunoscutei stive ELK.

Există, de asemenea, o consolă web conectabilă și extensibilă, care oferă o interfață de utilizare pentru gestionarea Camel, ActiveMQ și multe altele, numită hawt.io.

Captură de ecran 2

Testarea rutelor

Apache Camel are o funcționalitate destul de largă pentru scrierea rutelor de testare cu componente simulate. Este un instrument puternic, dar scrierea rutelor separate doar pentru testare este un proces care consumă timp. Ar fi mai eficient să se execute teste pe rutele de producție fără a le modifica conducta. Camel are această caracteristică și poate fi implementată folosind componenta AdviceWith.

Să activăm logica de testare în exemplul nostru și să rulăm un test de probă.

 <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>

Clasa de testare este:

 public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }

Acum rulați teste pentru aplicație cu mvn test . Putem vedea că traseul nostru a fost executat cu succes cu sfaturile de testare. Nu există mesaje trecute prin cozile reale și testele au fost trecute.

 INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied

Utilizarea Apache Camel cu Kubernetes Cluster

Una dintre problemele de integrare astăzi este că aplicațiile nu mai sunt statice. Într-o infrastructură cloud, avem de-a face cu servicii virtuale care rulează pe mai multe noduri în același timp. Permite arhitectura microserviciilor cu o rețea de servicii mici și ușoare care interacționează între ele. Aceste servicii au o durată de viață nesigură și trebuie să le descoperim dinamic.

Lipirea serviciilor cloud împreună este o sarcină care poate fi rezolvată cu Apache Camel. Este deosebit de interesant din cauza aromei EIP și a faptului că Camel are o mulțime de adaptoare și acceptă o gamă largă de protocoale. Versiunea recentă 2.18 adaugă componenta ServiceCall, care introduce o caracteristică de apelare a unui API și de rezolvare a adresei acestuia prin mecanisme de descoperire a clusterului. În prezent, acceptă Consul, Kubernetes, Ribbon etc. Câteva exemple de cod, în care ServiceCall este configurat cu Consul, pot fi găsite cu ușurință. Vom folosi Kubernetes aici, deoarece este soluția mea de clustering preferată.

Schema de integrare va fi următoarea:

Schemă

Serviciul de Order și serviciul de Inventory vor fi câteva aplicații Spring Boot banale care returnează date statice. Nu suntem legați de o anumită stivă de tehnologie aici. Aceste servicii produc datele pe care dorim să le procesăm.

Controler de service comanda:

 @RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }

Produce date în formatul:

 [{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]

Controlerul serviciului de Inventory este absolut similar cu serviciul de Order :

 @RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }

InventoryStorage este un depozit generic care deține date. În acest exemplu, returnează obiecte statice predefinite, care sunt clasificate în următorul format.

 [{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]

Să scriem o rută gateway care le conectează, dar fără ServiceCall la acest pas:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);

Acum imaginați-vă că fiecare serviciu nu mai este o instanță specifică, ci un nor de instanțe care funcționează ca una. Vom folosi Minikube pentru a încerca clusterul Kubernetes la nivel local.

Configurați rutele de rețea pentru a vedea nodurile Kubernetes local (exemplul dat este pentru un mediu Mac/Linux):

 # remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5

Împachetați serviciile în containere Docker cu o configurație Dockerfile astfel:

 FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar

Creați și împingeți imaginile de serviciu în registrul Docker. Acum rulați nodurile din clusterul local Kubernetes.

Configurația de implementare Kubernetes.yaml:

 apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081

Expuneți aceste implementări ca servicii în cluster:

 kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort

Acum putem verifica dacă cererile sunt servite de noduri alese aleatoriu din cluster. Rulați curl -X http://192.168.99.100:30517/info secvenţial de mai multe ori pentru a accesa minikube NodePort pentru serviciul expus (folosind gazda și portul). În rezultat, vedem că am realizat echilibrarea cererilor.

 Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4

Adăugați camel-kubernetes și camel-netty4-http la pom.xml proiectului. Apoi configurați componenta ServiceCall pentru a utiliza descoperirea nodului principal Kubernetes partajată pentru toate apelurile de serviciu dintre definițiile rutei:

 KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);

ServiceCall EIP completează bine Spring Boot. Majoritatea opțiunilor pot fi configurate direct în fișierul application.properties .

Îmbunătățiți traseul Camel cu componenta ServiceCall:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);

Am activat și Circuit Breaker în traseu. Este un cârlig de integrare care permite întreruperea apelurilor de sistem de la distanță în caz de erori de livrare sau indisponibilitate a destinatarului. Acesta este conceput pentru a evita defecțiunea sistemului în cascadă. Componenta Hystrix ajută la realizarea acestui lucru prin implementarea modelului Circuit Breaker.

Să-l rulăm și să trimitem o cerere de testare; vom obține răspunsul agregat de la ambele servicii.

 [{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]

Rezultatul este cel așteptat.

Alte cazuri de utilizare

Am arătat cum Apache Camel poate integra microservicii într-un cluster. Care sunt alte utilizări ale acestui cadru? În general, este util în orice loc în care rutarea bazată pe reguli poate fi o soluție. For instance, Apache Camel can be a middleware for the Internet of Things with the Eclipse Kura adapter. It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.

Concluzie

You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.

To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:

  • Is there a separate integration layer?
  • Are there tests for integration?
  • Do we know the expected peak data intensity?
  • Do we know the expected data delivery time?
  • Does message correlation matter? What if a sequence breaks?
  • Should we do it in a synchronous or asynchronous way?
  • Where do formats and routing rules change more frequently?
  • Do we have ways to monitor the process?

In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.

If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.