Semplificare l'integrazione del software: un tutorial Apache Camel

Pubblicato: 2022-03-11

Il software raramente, se non del tutto, esiste in un vuoto informativo. Almeno, questo è il presupposto che noi ingegneri del software possiamo fare per la maggior parte delle applicazioni che sviluppiamo.

A qualsiasi scala, ogni pezzo di software, in un modo o nell'altro, comunica con qualche altro software per vari motivi: per ottenere dati di riferimento da qualche parte, per inviare segnali di monitoraggio, per essere in contatto con altri servizi pur facendo parte di un sistema distribuito sistema e altro ancora.

In questo tutorial imparerai quali sono alcune delle maggiori sfide dell'integrazione di software di grandi dimensioni e come Apache Camel le risolve con facilità.

Il problema: progettazione architettonica per l'integrazione di sistemi

Potresti aver fatto quanto segue almeno una volta nella tua vita di ingegneria del software:

  • Identifica un frammento della tua logica aziendale che dovrebbe avviare l'invio dei dati.
  • Nello stesso livello dell'applicazione, scrivi le trasformazioni dei dati in base a ciò che si aspetta il destinatario.
  • Avvolgi i dati in una struttura adatta per il trasferimento e l'instradamento su una rete.
  • Aprire una connessione a un'applicazione di destinazione utilizzando un driver appropriato o un SDK client.
  • Invia i dati e gestisci la risposta.

Perché questa è una cattiva linea d'azione?

Sebbene tu abbia solo poche connessioni di questo tipo, rimane gestibile. Con un numero crescente di relazioni tra sistemi, la logica di business dell'applicazione si mescola con la logica di integrazione, che consiste nell'adattare i dati, compensare le differenze tecnologiche tra due sistemi e trasferire i dati al sistema esterno con SOAP, REST o richieste più esotiche .

Se dovessi integrare più applicazioni, sarebbe incredibilmente difficile ricostruire l'intero quadro delle dipendenze in tale codice: dove vengono prodotti i dati e quali servizi li consumano? Avrai molti posti in cui la logica di integrazione è duplicata, per l'avvio.

Con un tale approccio, sebbene il compito sia tecnicamente portato a termine, finiamo con enormi problemi con la manutenibilità e la scalabilità dell'integrazione. Una rapida riorganizzazione dei flussi di dati in questo sistema è quasi impossibile, per non parlare di problemi più profondi come la mancanza di monitoraggio, l'interruzione del circuito, il laborioso recupero dei dati, ecc.

Tutto ciò è particolarmente importante quando si integra il software nell'ambito di un'impresa di notevoli dimensioni. Affrontare l'integrazione aziendale significa lavorare con un insieme di applicazioni, che operano su un'ampia gamma di piattaforme ed esistono in luoghi diversi. Lo scambio di dati in un tale panorama software è piuttosto impegnativo. Deve soddisfare gli elevati standard di sicurezza del settore e fornire un modo affidabile per trasferire i dati. In un ambiente aziendale, l'integrazione dei sistemi richiede una progettazione dell'architettura separata e accuratamente elaborata.

Questo articolo ti introdurrà alle difficoltà uniche incontrate nell'integrazione del software e fornirà alcune soluzioni basate sull'esperienza per le attività di integrazione. Acquisiremo familiarità con Apache Camel, un framework utile che può alleviare i peggiori mal di testa di uno sviluppatore di integrazione. Seguiremo con un esempio di come Camel può aiutare a stabilire la comunicazione in un cluster di microservizi basati su Kubernetes.

Difficoltà di integrazione

Un approccio ampiamente utilizzato per risolvere il problema consiste nel disaccoppiare un livello di integrazione nell'applicazione. Può esistere all'interno della stessa applicazione o come un software dedicato che esegue indipendentemente, in quest'ultimo caso chiamato middleware.

Quali problemi incontri in genere durante lo sviluppo e il supporto del middleware? In generale, hai i seguenti elementi chiave:

  • Tutti i canali di dati sono in una certa misura inaffidabili. I problemi derivanti da questa inaffidabilità potrebbero non verificarsi mentre l'intensità dei dati è da bassa a moderata. Ogni livello di archiviazione, dalla memoria dell'applicazione alle cache inferiori e alle apparecchiature sottostanti, è soggetto a potenziali guasti. Alcuni errori rari si verificano solo con enormi volumi di dati. Anche i prodotti di fornitori maturi pronti per la produzione hanno problemi di bug tracker irrisolti relativi alla perdita di dati. Un sistema middleware dovrebbe essere in grado di informarti di queste perdite di dati e fornire la riconsegna dei messaggi in modo tempestivo.
  • Le applicazioni utilizzano protocolli e formati di dati diversi. Ciò significa che un sistema di integrazione è una tenda per le trasformazioni di dati e adattatori per altri partecipanti e utilizza una varietà di tecnologie. Questi possono includere semplici chiamate API REST, ma potrebbero anche accedere a un broker di code, inviare ordini CSV su FTP o estrarre dati in batch in una tabella di database. Questa è una lunga lista e non sarà mai più breve.
  • I cambiamenti nei formati dei dati e nelle regole di routing sono inevitabili. Ogni fase del processo di sviluppo di un'applicazione, che modifica la struttura dei dati, di solito porta a cambiamenti nei formati e nelle trasformazioni dei dati di integrazione. A volte sono necessarie modifiche all'infrastruttura con flussi di dati aziendali riorganizzati. Ad esempio, queste modifiche potrebbero verificarsi quando si introduce un unico punto di convalida dei dati di riferimento che deve elaborare tutte le voci di dati anagrafici in tutta l'azienda. Con N sistemi, potremmo finire per avere un massimo di quasi N^2 connessioni tra di loro, quindi il numero di punti in cui devono essere applicate le modifiche cresce abbastanza velocemente. Sarà come una valanga. Per sostenere la manutenibilità, un livello middleware deve fornire un quadro chiaro delle dipendenze con routing versatile e trasformazione dei dati.

Queste idee dovrebbero essere tenute a mente quando si progetta l'integrazione e si sceglie la soluzione middleware più adatta. Uno dei modi possibili per gestirlo è sfruttare un bus di servizio aziendale (ESB). Ma gli ESB forniti dai principali fornitori sono generalmente troppo pesanti e spesso creano più problemi di quanto valgano: è quasi impossibile iniziare rapidamente con un ESB, ha una curva di apprendimento piuttosto ripida e la sua flessibilità viene sacrificata a un lungo elenco di funzionalità e strumenti integrati. A mio parere, le soluzioni di integrazione open source leggere sono di gran lunga superiori: sono più elastiche, facili da distribuire nel cloud e facili da scalare.

L'integrazione del software non è facile. Oggi, mentre costruiamo architetture di microservizi e gestiamo sciami di piccoli servizi, abbiamo anche grandi aspettative sull'efficienza con cui dovrebbero comunicare.

Modelli di integrazione aziendale

Come ci si potrebbe aspettare, come lo sviluppo del software in generale, lo sviluppo dell'instradamento e della trasformazione dei dati comporta operazioni ripetitive. L'esperienza in questo campo è stata sintetizzata e sistematizzata da professionisti che da tempo gestiscono problemi di integrazione. Nel risultato, c'è una serie di modelli estratti chiamati modelli di integrazione aziendale utilizzati per la progettazione dei flussi di dati. Questi metodi di integrazione sono stati descritti nel libro omonimo di Gregor Hophe e Bobby Wolfe, che è molto simile al significativo libro di Gang of Four ma nell'area dell'incollaggio del software.

Per fare un esempio, il modello normalizzatore introduce un componente che mappa messaggi semanticamente uguali che hanno formati di dati diversi su un unico modello canonico, oppure l'aggregatore è un EIP che combina una sequenza di messaggi in uno solo.

Poiché sono astrazioni consolidate indipendenti dalla tecnologia utilizzate per risolvere problemi di architettura, gli EIP aiutano a scrivere un progetto di architettura, che non approfondisce il livello di codice ma descrive i flussi di dati in modo sufficientemente dettagliato. Tale notazione per descrivere i percorsi di integrazione non solo rende il progetto conciso, ma stabilisce anche una nomenclatura comune e un linguaggio comune, che sono molto importanti nel contesto della risoluzione di un compito di integrazione con i membri del team di varie aree aziendali.

Presentazione di Apache Camel

Diversi anni fa, stavo costruendo un'integrazione aziendale in un'enorme rete di vendita al dettaglio di generi alimentari con negozi in località ampiamente distribuite. Ho iniziato con una soluzione ESB proprietaria, che si è rivelata eccessivamente macchinosa da mantenere. Quindi, il nostro team si è imbattuto in Apache Camel e, dopo aver svolto un lavoro di "prova di concetto", abbiamo rapidamente riscritto tutti i nostri flussi di dati nelle rotte Camel.

Apache Camel può essere descritto come un "router di mediazione", un framework middleware orientato ai messaggi che implementa l'elenco di EIP, con cui ho familiarizzato. Fa uso di questi modelli, supporta tutti i protocolli di trasporto comuni e include un vasto set di adattatori utili. Camel consente la gestione di una serie di routine di integrazione senza la necessità di scrivere il proprio codice.

Oltre a questo, vorrei individuare le seguenti caratteristiche di Apache Camel:

  • I percorsi di integrazione sono scritti come pipeline composte da blocchi. Crea un'immagine totalmente trasparente per aiutare a rintracciare i flussi di dati.
  • Camel ha adattatori per molte API popolari. Ad esempio, ottenere dati da Apache Kafka, monitorare istanze AWS EC2, integrarsi con Salesforce: tutte queste attività possono essere risolte utilizzando componenti disponibili immediatamente.

Le rotte Apache Camel possono essere scritte in Java o Scala DSL. (È disponibile anche una configurazione XML, ma diventa troppo dettagliata e ha capacità di debug peggiori.) Non impone restrizioni allo stack tecnologico dei servizi di comunicazione, ma se scrivi in ​​Java o Scala, puoi invece incorporare Camel in un'applicazione di eseguirlo autonomamente.

La notazione di instradamento utilizzata da Camel può essere descritta con il seguente semplice pseudocodice:

 from(Source) .transform(Transformer) .to(Destination)

Source , Transformer e Destination sono endpoint che fanno riferimento ai componenti di implementazione tramite i relativi URI.

Cosa consente a Camel di risolvere i problemi di integrazione descritti in precedenza? Diamo un'occhiata. In primo luogo, la logica di instradamento e trasformazione ora vive solo in una configurazione Apache Camel dedicata. In secondo luogo, attraverso la succinta e naturale DSL in combinazione con l'uso di EIP, appare un quadro delle dipendenze tra i sistemi. È fatto di astrazioni comprensibili e la logica di routing è facilmente regolabile. Infine, non è necessario scrivere un mucchio di codice di trasformazione perché è probabile che gli adattatori appropriati siano già inclusi.

Integrazioni

Dovrei aggiungere, Apache Camel è un framework maturo e riceve aggiornamenti regolari. Ha una grande comunità e una considerevole base di conoscenze cumulative.

Ha i suoi svantaggi. Camel non dovrebbe essere considerato una suite di integrazione complessa. È una cassetta degli attrezzi senza funzionalità di alto livello come strumenti di gestione dei processi aziendali o monitor delle attività, ma può essere utilizzata per creare tale software.

Sistemi alternativi potrebbero essere, ad esempio, Spring Integration o Mule ESB. Per Spring Integration, sebbene sia considerato leggero, secondo la mia esperienza, metterlo insieme e scrivere molti file di configurazione XML può rivelarsi inaspettatamente complicato e non è certo una via d'uscita facile. Mule ESB è un set di strumenti robusto e molto funzionale, ma come suggerisce il nome, è un bus di servizio aziendale, quindi appartiene a una categoria di peso diversa. Mule può essere paragonato a Fuse ESB, un prodotto simile basato su Apache Camel con un ricco set di funzionalità. Per me, utilizzare Apache Camel per i servizi di incollaggio è un gioco da ragazzi oggi. È facile da usare e produce una descrizione chiara di cosa va dove, allo stesso tempo è sufficientemente funzionale per creare integrazioni complesse.

Scrivere un percorso di esempio

Iniziamo a scrivere il codice. Inizieremo da un flusso di dati sincrono che instrada i messaggi da un'unica origine a un elenco di destinatari. Le regole di instradamento saranno scritte in Java DSL.

Useremo Maven per costruire il progetto. Innanzitutto aggiungi la seguente dipendenza a pom.xml :

 <dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>

In alternativa, l'applicazione può essere costruita sopra l' camel-archetype-java .

Le definizioni del percorso del cammello sono dichiarate nel metodo RouteBuilder.configure .

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }

In questa definizione, creiamo un percorso che recupera i record dal file JSON, li divide in elementi e li instrada a un insieme di gestori in base al contenuto del messaggio.

Eseguiamolo su dati di test preparati. Otterremo l'output:

 INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert

Come previsto, Camel ha instradato i messaggi alle destinazioni.

Scelte di trasferimento dei dati

Nell'esempio sopra, l'interazione tra i componenti è sincrona ed eseguita attraverso la memoria dell'applicazione. Tuttavia, ci sono molti altri modi per comunicare quando abbiamo a che fare con applicazioni separate che non condividono la memoria:

  • Scambio di file. Un'applicazione produce file di dati condivisi da utilizzare per l'altra. È dove vive lo spirito della vecchia scuola. Questo metodo di comunicazione ha una miriade di conseguenze: mancanza di transazioni e coerenza, prestazioni scadenti e coordinamento isolato tra i sistemi. Molti sviluppatori hanno finito per scrivere soluzioni di integrazione fatte in casa per rendere il processo più o meno gestibile.
  • Banca dati comune. Fai in modo che le applicazioni memorizzino i dati che desiderano condividere in uno schema comune di un unico database. La progettazione di uno schema unificato e la gestione dell'accesso simultaneo alle tabelle sono le sfide più importanti di questo approccio. Come per lo scambio di file, è facile che questo diventi un collo di bottiglia permanente.
  • Chiamata API remota. Fornire un'interfaccia per consentire a un'applicazione di interagire con un'altra applicazione in esecuzione, come una tipica chiamata al metodo. Le applicazioni condividono le funzionalità tramite chiamate API, ma le accoppiano strettamente nel processo.
  • Messaggistica. Connettere ogni applicazione a un sistema di messaggistica comune, scambiare dati e invocare il comportamento in modo asincrono utilizzando i messaggi. Né il mittente né il destinatario devono essere attivi e operativi contemporaneamente per ricevere il messaggio.

Esistono più modi per interagire, ma dobbiamo tenere presente che, in linea di massima, esistono due tipi di interazione: sincrona e asincrona. Il primo è come chiamare una funzione nel codice: il flusso di esecuzione attenderà l'esecuzione e restituirà un valore. Con un approccio asincrono, gli stessi dati vengono inviati tramite una coda di messaggi intermedia o un argomento di sottoscrizione. Una chiamata di funzione remota asincrona può essere implementata come EIP richiesta-risposta.

Tuttavia, la messaggistica asincrona non è una panacea; comporta alcune restrizioni. Raramente vedi API di messaggistica sul Web; i servizi REST sincroni sono molto più popolari. Ma il middleware di messaggistica è ampiamente utilizzato nell'intranet aziendale o nell'infrastruttura di back-end del sistema distribuito.

Utilizzo delle code di messaggi

Rendiamo il nostro esempio asincrono. Un sistema software che gestisce le code e gli argomenti di sottoscrizione è chiamato broker di messaggi. È come un RDBMS per tabelle e colonne. Le code servono come integrazione punto a punto mentre gli argomenti servono per la comunicazione pubblicazione-sottoscrizione con molti destinatari. Useremo Apache ActiveMQ come broker di messaggi JMS perché è solido e integrabile.

Aggiungi la seguente dipendenza. A volte è eccessivo aggiungere activemq-all , che contiene tutti i jar ActiveMQ, al progetto, ma manterremo semplici le dipendenze della nostra applicazione.

 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>

Quindi avviare il broker a livello di codice. In Spring Boot, otteniamo un'autoconfigurazione per questo collegando la dipendenza spring-boot-starter-activemq Maven.

Esegui un nuovo broker di messaggi con i seguenti comandi, specificando solo l'endpoint del connettore:

 BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();

E aggiungi il seguente frammento di configure al corpo del metodo di configurazione:

 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

Ora possiamo aggiornare l'esempio precedente usando le code di messaggi. Le code verranno create automaticamente alla consegna del messaggio.

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }

Va bene, ora l'interazione è diventata asincrona. I potenziali consumatori di questi dati possono accedervi quando sono pronti. Questo è un esempio di accoppiamento libero, che cerchiamo di ottenere in un'architettura reattiva. L'indisponibilità di uno dei servizi non bloccherà gli altri. Inoltre, un consumatore può ridimensionare e leggere dalla coda in parallelo. La coda stessa può essere ridimensionata ed essere partizionata. Le code persistenti possono archiviare i dati sul disco, in attesa di essere elaborati, anche quando tutti i partecipanti sono scesi. Di conseguenza, questo sistema è più tollerante ai guasti.

Un fatto sorprendente è che il CERN utilizza Apache Camel e ActiveMQ per monitorare i sistemi del Large Hadron Collider (LHC). C'è anche un'interessante tesi di laurea che spiega la scelta di una soluzione middleware appropriata per questo compito. Quindi, come si dice nel keynote, "No JMS, no fisica delle particelle!"

Monitoraggio

Nell'esempio precedente, abbiamo creato il canale dati tra due servizi. È un ulteriore potenziale punto di errore in un'architettura, quindi dobbiamo prenderci cura di esso. Diamo un'occhiata alle funzionalità di monitoraggio fornite da Apache Camel. Fondamentalmente, espone informazioni statistiche sui suoi percorsi attraverso gli MBean, accessibili da JMX. ActiveMQ espone le statistiche della coda allo stesso modo.

Accendiamo il server JMX nell'applicazione, per consentirne l'esecuzione con le opzioni della riga di comando:

 -Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel

Ora esegui l'applicazione in modo che il percorso abbia svolto il suo lavoro. Aprire lo strumento jconsole standard e connettersi al processo dell'applicazione. Collegarsi al service:jmx:rmi:///jndi/rmi://localhost:1099/camel . Vai al dominio org.apache.camel nell'albero MBeans.

Schermata 1

Possiamo vedere che tutto ciò che riguarda il routing è sotto controllo. Abbiamo il numero di messaggi in corso, il conteggio degli errori e il conteggio dei messaggi nelle code. Queste informazioni possono essere inviate ad alcuni set di strumenti di monitoraggio con funzionalità avanzate come Graphana o Kibana. Puoi farlo implementando il noto stack ELK.

C'è anche una console Web collegabile ed estensibile che fornisce un'interfaccia utente per la gestione di Camel, ActiveMQ e molti altri, chiamata hawt.io.

Schermata 2

Percorsi di prova

Apache Camel ha funzionalità piuttosto ampie per la scrittura di percorsi di prova con componenti fittizi. È uno strumento potente, ma scrivere percorsi separati solo per i test è un processo che richiede tempo. Sarebbe più efficiente eseguire test sui percorsi di produzione senza modificare la loro pipeline. Camel ha questa funzionalità e può essere implementato utilizzando il componente AdviceWith.

Abilitiamo la logica di test nel nostro esempio ed eseguiamo un test di esempio.

 <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>

La classe di prova è:

 public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }

Ora esegui i test per l'applicazione con mvn test . Possiamo vedere che il nostro percorso è stato eseguito con successo con il consiglio di test. Non ci sono messaggi passati attraverso le code effettive e i test sono stati superati.

 INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied

Utilizzo di Apache Camel con Kubernetes Cluster

Uno dei problemi di integrazione oggi è che le applicazioni non sono più statiche. In un'infrastruttura cloud, ci occupiamo di servizi virtuali eseguiti su più nodi contemporaneamente. Abilita l'architettura dei microservizi con una rete di servizi piccoli e leggeri che interagiscono tra loro. Questi servizi hanno una durata inaffidabile e dobbiamo scoprirli in modo dinamico.

L'incollaggio di servizi cloud insieme è un compito che può essere risolto con Apache Camel. È particolarmente interessante a causa del sapore EIP e del fatto che Camel ha molti adattatori e supporta un'ampia gamma di protocolli. La recente versione 2.18 aggiunge il componente ServiceCall, che introduce una funzionalità per chiamare un'API e risolverne l'indirizzo tramite meccanismi di rilevamento del cluster. Attualmente supporta Consul, Kubernetes, Ribbon, ecc. Alcuni esempi di codice, in cui ServiceCall è configurato con Consul, possono essere trovati facilmente. Useremo Kubernetes qui perché è la mia soluzione di clustering preferita.

Lo schema di integrazione sarà il seguente:

Schema

Il servizio Order e il servizio Inventory saranno un paio di banali applicazioni Spring Boot che restituiscono dati statici. Non siamo legati a un particolare stack tecnologico qui. Questi servizi stanno producendo i dati che vogliamo elaborare.

Controllore del servizio ordini:

 @RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }

Produce dati nel formato:

 [{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]

Il controller del servizio di Inventory è assolutamente simile al servizio di Order :

 @RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }

InventoryStorage è un repository generico che contiene dati. In questo esempio, restituisce oggetti statici predefiniti, di cui viene eseguito il marshalling nel formato seguente.

 [{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]

Scriviamo un gateway route che li collega, ma senza ServiceCall in questo passaggio:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);

Ora immagina che ogni servizio non sia più un'istanza specifica ma un cloud di istanze che operano come una. Useremo Minikube per provare localmente il cluster Kubernetes.

Configura i percorsi di rete per vedere i nodi Kubernetes localmente (l'esempio fornito è per un ambiente Mac/Linux):

 # remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5

Avvolgi i servizi nei contenitori Docker con una configurazione Dockerfile in questo modo:

 FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar

Crea e invia le immagini del servizio al registro Docker. Ora esegui i nodi nel cluster Kubernetes locale.

Configurazione della distribuzione di Kubernetes.yaml:

 apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081

Esponi queste distribuzioni come servizi nel cluster:

 kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort

Ora possiamo verificare se le richieste sono servite da nodi scelti casualmente dal cluster. Esegui curl -X http://192.168.99.100:30517/info in sequenza più volte per accedere a minikube NodePort per il servizio esposto (usando l'host e la porta). Nell'output, stiamo vedendo che abbiamo raggiunto il bilanciamento delle richieste.

 Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4

Aggiungi camel-kubernetes e camel-netty4-http al pom.xml del progetto. Quindi configura il componente ServiceCall per utilizzare il rilevamento del nodo master Kubernetes condiviso per tutte le chiamate di servizio tra le definizioni di instradamento:

 KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);

ServiceCall EIP integra bene Spring Boot. La maggior parte delle opzioni può essere configurata direttamente nel file application.properties .

Potenzia il percorso Camel con il componente ServiceCall:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);

Abbiamo anche attivato Circuit Breaker nel percorso. È un hook di integrazione che consente di sospendere le chiamate di sistema remote in caso di errori di consegna o indisponibilità del destinatario. Questo è progettato per evitare guasti al sistema in cascata. Il componente Hystrix aiuta a raggiungere questo obiettivo implementando il modello Circuit Breaker.

Eseguiamolo e inviamo una richiesta di test; otterremo la risposta aggregata da entrambi i servizi.

 [{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]

Il risultato è come previsto.

Altri casi d'uso

Ho mostrato come Apache Camel può integrare i microservizi in un cluster. Quali sono gli altri usi di questo framework? In generale, è utile in qualsiasi luogo in cui il routing basato su regole può essere una soluzione. For instance, Apache Camel can be a middleware for the Internet of Things with the Eclipse Kura adapter. It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.

Conclusione

You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.

To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:

  • Is there a separate integration layer?
  • Are there tests for integration?
  • Do we know the expected peak data intensity?
  • Do we know the expected data delivery time?
  • Does message correlation matter? What if a sequence breaks?
  • Should we do it in a synchronous or asynchronous way?
  • Where do formats and routing rules change more frequently?
  • Do we have ways to monitor the process?

In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.

If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.