Rationaliser l'intégration logicielle : un didacticiel Apache Camel
Publié: 2022-03-11Les logiciels existent rarement, voire pas du tout, dans un vide informationnel. C'est du moins l'hypothèse que les ingénieurs logiciels peuvent faire pour la plupart des applications que nous développons.
Quelle que soit l'échelle, chaque logiciel, d'une manière ou d'une autre, communique avec un autre logiciel pour diverses raisons : pour obtenir des données de référence quelque part, pour envoyer des signaux de surveillance, pour être en contact avec d'autres services tout en faisant partie d'un réseau distribué. système, et plus encore.
Dans ce didacticiel, vous apprendrez quels sont certains des plus grands défis de l'intégration de gros logiciels et comment Apache Camel les résout facilement.
Le problème : conception d'architecture pour l'intégration de systèmes
Vous avez peut-être fait ce qui suit au moins une fois dans votre vie d'ingénieur logiciel :
- Identifiez un fragment de votre logique métier qui doit initier l'envoi des données.
- Dans la même couche application, écrivez les transformations de données conformément à ce que le destinataire attend.
- Enveloppez les données dans une structure adaptée au transfert et au routage sur un réseau.
- Ouvrez une connexion à une application cible à l'aide d'un pilote approprié ou d'un SDK client.
- Envoyez les données et gérez la réponse.
Pourquoi est-ce une mauvaise ligne de conduite ?
Même si vous n'avez que quelques connexions de ce genre, cela reste gérable. Avec un nombre croissant de relations entre les systèmes, la logique métier de l'application se mélange à la logique d'intégration, qui consiste à adapter les données, à compenser les différences technologiques entre deux systèmes et à transférer les données vers le système externe avec SOAP, REST ou des requêtes plus exotiques. .
Si vous intégriez plusieurs applications, il serait incroyablement difficile de retracer l'ensemble des dépendances dans un tel code : où sont produites les données et quels services les consomment ? Vous aurez de nombreux endroits où la logique d'intégration est dupliquée, pour démarrer.
Avec une telle approche, bien que la tâche soit techniquement accomplie, nous nous retrouvons avec d'énormes problèmes de maintenabilité et d'évolutivité de l'intégration. Une réorganisation rapide des flux de données dans ce système est quasiment impossible, sans parler des problèmes plus profonds tels que le manque de surveillance, les coupures de circuit, la récupération laborieuse des données, etc.
Ceci est particulièrement important lors de l'intégration d'un logiciel dans le cadre d'une très grande entreprise. Faire face à l'intégration d'entreprise signifie travailler avec un ensemble d'applications, qui fonctionnent sur une large gamme de plates-formes et existent dans différents endroits. L'échange de données dans un tel paysage logiciel est assez exigeant. Il doit répondre aux normes de sécurité élevées de l'industrie et fournir un moyen fiable de transférer des données. Dans un environnement d'entreprise, l'intégration de systèmes nécessite une conception d'architecture distincte et minutieusement élaborée.
Cet article vous présentera les difficultés uniques rencontrées dans l'intégration de logiciels et vous fournira des solutions basées sur l'expérience pour les tâches d'intégration. Nous nous familiariserons avec Apache Camel, un cadre utile qui peut atténuer les pires maux de tête d'un développeur d'intégration. Nous suivrons avec un exemple de la façon dont Camel peut aider à établir la communication dans un cluster de microservices alimenté par Kubernetes.
Difficultés d'intégration
Une approche largement utilisée pour résoudre le problème consiste à découpler une couche d'intégration dans votre application. Il peut exister au sein de la même application ou sous la forme d'un logiciel dédié fonctionnant indépendamment, dans ce dernier cas appelé middleware.
À quels problèmes êtes-vous généralement confronté lors du développement et de la prise en charge d'intergiciels ? En général, vous disposez des éléments clés suivants :
- Tous les canaux de données ne sont pas fiables dans une certaine mesure. Les problèmes découlant de ce manque de fiabilité peuvent ne pas se produire lorsque l'intensité des données est faible à modérée. Chaque niveau de stockage, de la mémoire d'application aux caches inférieurs et à l'équipement en dessous, est sujet à une défaillance potentielle. Certaines erreurs rares ne surviennent qu'avec d'énormes volumes de données. Même les produits de fournisseurs matures et prêts pour la production ont des problèmes de suivi de bogues non résolus liés à la perte de données. Un système middleware devrait être en mesure de vous informer de ces pertes de données et de fournir une nouvelle livraison des messages en temps opportun.
- Les applications utilisent différents protocoles et formats de données. Cela signifie qu'un système d'intégration est un rideau pour les transformations de données et les adaptateurs vers d'autres participants et utilise une variété de technologies. Ceux-ci peuvent inclure des appels d'API REST simples, mais peuvent également accéder à un courtier de file d'attente, envoyer des commandes CSV via FTP ou extraire des données par lots vers une table de base de données. C'est une longue liste et elle ne sera jamais plus courte.
- Les modifications des formats de données et des règles de routage sont inévitables. Chaque étape du processus de développement d'une application, qui modifie la structure des données, entraîne généralement des changements dans les formats et les transformations des données d'intégration. Parfois, des changements d'infrastructure avec des flux de données d'entreprise réorganisés sont nécessaires. Par exemple, ces changements peuvent se produire lors de l'introduction d'un point unique de validation des données de référence qui doit traiter toutes les entrées de données de base dans toute l'entreprise. Avec
N
systèmes, nous pouvons finir par avoir un maximum de près deN^2
connexions entre eux, de sorte que le nombre d'endroits où les changements doivent être appliqués augmente assez rapidement. Ce sera comme une avalanche. Pour maintenir la maintenabilité, une couche middleware doit fournir une image claire des dépendances avec un routage polyvalent et une transformation des données.
Ces idées doivent être gardées à l'esprit lors de la conception de l'intégration et du choix de la solution middleware la plus appropriée. L'un des moyens possibles de le gérer consiste à tirer parti d'un bus de service d'entreprise (ESB). Mais les ESB fournis par les principaux fournisseurs sont généralement trop lourds et posent souvent plus de problèmes qu'ils n'en valent la peine : il est presque impossible de démarrer rapidement avec un ESB, il a une courbe d'apprentissage assez abrupte et sa flexibilité est sacrifiée à une longue liste. de fonctionnalités et d'outils intégrés. À mon avis, les solutions d'intégration open source légères sont de loin supérieures : elles sont plus élastiques, faciles à déployer dans le cloud et faciles à mettre à l'échelle.
L'intégration du logiciel n'est pas facile à faire. Aujourd'hui, alors que nous construisons des architectures de microservices et traitons avec des essaims de petits services, nous avons également des attentes élevées quant à l'efficacité avec laquelle ils doivent communiquer.
Modèles d'intégration d'entreprise
Comme on pouvait s'y attendre, comme le développement de logiciels en général, le développement du routage et de la transformation des données implique des opérations répétitives. L'expérience dans ce domaine a été résumée et systématisée par des professionnels qui traitent des problèmes d'intégration depuis un certain temps. Dans le résultat, il existe un ensemble de modèles extraits appelés modèles d'intégration d'entreprise utilisés pour concevoir des flux de données. Ces méthodes d'intégration ont été décrites dans le livre du même nom de Gregor Hophe et Bobby Wolfe, qui ressemble beaucoup au livre important de Gang of Four mais dans le domaine des logiciels de collage.
Pour donner un exemple, le modèle de normalisation introduit un composant qui mappe des messages sémantiquement égaux qui ont des formats de données différents à un seul modèle canonique, ou l'agrégateur est un EIP qui combine une séquence de messages en un seul.
Puisqu'il s'agit d'abstractions établies et indépendantes de la technologie utilisées pour résoudre les problèmes d'architecture, les EIP aident à écrire une conception d'architecture, qui ne plonge pas dans le niveau du code mais décrit les flux de données de manière suffisamment détaillée. Une telle notation pour décrire les voies d'intégration rend non seulement la conception concise, mais définit également une nomenclature commune et un langage commun, qui sont très importants dans le contexte de la résolution d'une tâche d'intégration avec des membres d'équipe de divers domaines d'activité.
Présentation d'Apache Camel
Il y a plusieurs années, je construisais une intégration d'entreprise dans un vaste réseau de vente au détail d'épiceries avec des magasins dans des emplacements largement distribués. J'ai commencé avec une solution ESB propriétaire, qui s'est avérée trop lourde à maintenir. Ensuite, notre équipe est tombée sur Apache Camel, et après avoir fait un travail de « preuve de concept », nous avons rapidement réécrit tous nos flux de données dans les routes Camel.
Apache Camel peut être décrit comme un "routeur de médiation", un framework middleware orienté message implémentant la liste des EIP, avec laquelle je me suis familiarisé. Il utilise ces modèles, prend en charge tous les protocoles de transport courants et comprend un vaste ensemble d'adaptateurs utiles. Camel permet de gérer un certain nombre de routines d'intégration sans avoir à écrire votre propre code.
En dehors de cela, je distinguerais les fonctionnalités suivantes d'Apache Camel :
- Les routes d'intégration sont écrites sous forme de pipelines constitués de blocs. Il crée une image totalement transparente pour aider à suivre les flux de données.
- Camel a des adaptateurs pour de nombreuses API populaires. Par exemple, obtenir des données d'Apache Kafka, surveiller les instances AWS EC2, s'intégrer à Salesforce, toutes ces tâches peuvent être résolues à l'aide de composants prêts à l'emploi.
Les routes Apache Camel peuvent être écrites en Java ou Scala DSL. (Une configuration XML est également disponible mais devient trop détaillée et a de moins bonnes capacités de débogage.) Elle n'impose pas de restrictions sur la pile technologique des services communicants, mais si vous écrivez en Java ou Scala, vous pouvez à la place intégrer Camel dans une application. de le faire fonctionner de manière autonome.
La notation de routage utilisée par Camel peut être décrite avec le pseudocode simple suivant :
from(Source) .transform(Transformer) .to(Destination)
Source
, Transformer
et Destination
sont des points de terminaison faisant référence aux composants d'implémentation par leurs URI.
Qu'est-ce qui permet à Camel de résoudre les problèmes d'intégration que j'ai décrits précédemment ? Regardons. Premièrement, la logique de routage et de transformation ne vit désormais que dans une configuration Apache Camel dédiée. Deuxièmement, à travers le DSL succinct et naturel en conjonction avec l'utilisation des EIP, une image des dépendances entre les systèmes apparaît. Il est fait d'abstractions compréhensibles et la logique de routage est facilement ajustable. Et enfin, nous n'avons pas à écrire des tas de code de transformation car les adaptateurs appropriés sont probablement déjà inclus.
Je dois ajouter qu'Apache Camel est un framework mature et reçoit des mises à jour régulières. Il a une grande communauté et une base de connaissances cumulative considérable.
Il a ses propres inconvénients. Camel ne doit pas être considéré comme une suite d'intégration complexe. Il s'agit d'une boîte à outils sans fonctionnalités de haut niveau telles que des outils de gestion des processus métier ou des moniteurs d'activité, mais elle peut être utilisée pour créer de tels logiciels.
Les systèmes alternatifs pourraient être, par exemple, Spring Integration ou Mule ESB. Pour Spring Integration, bien qu'il soit considéré comme léger, d'après mon expérience, le rassembler et écrire de nombreux fichiers de configuration XML peut s'avérer étonnamment compliqué et n'est pas une solution facile. Mule ESB est un ensemble d'outils robuste et très fonctionnel, mais comme son nom l'indique, il s'agit d'un bus de services d'entreprise, il appartient donc à une catégorie de poids différente. Mule peut être comparé à Fuse ESB, un produit similaire basé sur Apache Camel avec un riche ensemble de fonctionnalités. Pour moi, utiliser Apache Camel pour les services de collage est une évidence aujourd'hui. Il est facile à utiliser et produit une description claire de ce qui va où. En même temps, il est suffisamment fonctionnel pour créer des intégrations complexes.
Écrire un exemple de route
Commençons à écrire le code. Nous partirons d'un flux de données synchrone qui achemine les messages d'une source unique vers une liste de destinataires. Les règles de routage seront écrites en Java DSL.
Nous utiliserons Maven pour construire le projet. Ajoutez d'abord la dépendance suivante au pom.xml
:
<dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>
Alternativement, l'application peut être construite sur l' camel-archetype-java
.
Les définitions de route Camel sont déclarées dans la méthode RouteBuilder.configure
.
public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }
Dans cette définition, nous créons une route qui récupère les enregistrements du fichier JSON, les divise en éléments et les route vers un ensemble de gestionnaires en fonction du contenu du message.
Exécutons-le sur des données de test préparées. Nous obtiendrons la sortie :
INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert
Comme prévu, Camel a acheminé les messages vers les destinations.
Choix de transfert de données
Dans l'exemple ci-dessus, l'interaction entre les composants est synchrone et effectuée via la mémoire de l'application. Cependant, il existe de nombreuses autres façons de communiquer lorsque nous traitons avec des applications distinctes qui ne partagent pas de mémoire :
- Echange de fichiers. Une application produit des fichiers de données partagées que l'autre utilise. C'est là que vit l'esprit de la vieille école. Ce mode de communication a une multitude de conséquences : manque de transactions et de cohérence, performances médiocres et coordination isolée entre les systèmes. De nombreux développeurs ont fini par écrire des solutions d'intégration maison pour rendre le processus plus ou moins gérable.
- Base de données commune. Demandez aux applications de stocker les données qu'elles souhaitent partager dans un schéma commun d'une seule base de données. La conception d'un schéma unifié et la gestion de l'accès simultané aux tables sont les défis les plus importants de cette approche. Comme pour l'échange de fichiers, il est facile que cela devienne un goulot d'étranglement permanent.
- Appel d'API à distance. Fournir une interface pour permettre à une application d'interagir avec une autre application en cours d'exécution, comme un appel de méthode typique. Les applications partagent des fonctionnalités via des appels d'API, mais cela les couple étroitement dans le processus.
- Messagerie. Demandez à chaque application de se connecter à un système de messagerie commun, d'échanger des données et d'invoquer un comportement de manière asynchrone à l'aide de messages. Ni l'expéditeur ni le destinataire ne doivent être opérationnels en même temps pour que le message soit livré.
Il existe d'autres façons d'interagir, mais nous devons garder à l'esprit que, d'une manière générale, il existe deux types d'interaction : synchrone et asynchrone. Le premier est comme appeler une fonction dans votre code - le flux d'exécution attendra jusqu'à ce qu'il s'exécute et renvoie une valeur. Avec une approche asynchrone, les mêmes données sont envoyées via une file d'attente de messages intermédiaire ou un sujet d'abonnement. Un appel de fonction distant asynchrone peut être implémenté en tant qu'EIP de demande-réponse.
La messagerie asynchrone n'est cependant pas une panacée ; cela implique certaines restrictions. Vous voyez rarement des API de messagerie sur le Web ; les services REST synchrones sont beaucoup plus populaires. Mais le middleware de messagerie est largement utilisé dans l'intranet d'entreprise ou l'infrastructure back-end de système distribué.
Utilisation des files d'attente de messages
Rendons notre exemple asynchrone. Un système logiciel qui gère les files d'attente et les rubriques d'abonnement est appelé courtier de messages. C'est comme un SGBDR pour les tables et les colonnes. Les files d'attente servent d'intégration point à point tandis que les rubriques sont destinées à la communication de publication-abonnement avec de nombreux destinataires. Nous utiliserons Apache ActiveMQ comme courtier de messages JMS car il est solide et intégrable.
Ajoutez la dépendance suivante. Parfois, il est excessif d'ajouter activemq-all
, qui contient tous les jars ActiveMQ, au projet, mais nous garderons les dépendances de notre application simples.
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>
Ensuite, démarrez le courtier par programme. Dans Spring Boot, nous obtenons une configuration automatique pour cela en connectant la dépendance Maven spring-boot-starter-activemq
.
Exécutez un nouveau courtier de messages avec les commandes suivantes, en spécifiant uniquement le point de terminaison du connecteur :
BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();
Et ajoutez l'extrait de configuration suivant au corps de la méthode configure
:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));
Nous pouvons maintenant mettre à jour l'exemple précédent en utilisant des files d'attente de messages. Les files d'attente seront automatiquement créées lors de la livraison du message.

public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }
Très bien, maintenant l'interaction est devenue asynchrone. Les consommateurs potentiels de ces données peuvent y accéder lorsqu'ils sont prêts à le faire. Ceci est un exemple de couplage lâche, que nous essayons de réaliser dans une architecture réactive. L'indisponibilité d'un des services ne bloquera pas les autres. De plus, un consommateur peut mettre à l'échelle et lire à partir de la file d'attente en parallèle. La file d'attente elle-même peut évoluer et être partitionnée. Les files d'attente persistantes peuvent stocker les données sur le disque, en attente de traitement, même lorsque tous les participants sont tombés en panne. Par conséquent, ce système est plus tolérant aux pannes.
Fait étonnant, le CERN utilise Apache Camel et ActiveMQ pour surveiller les systèmes du Large Hadron Collider (LHC). Il existe également un mémoire de maîtrise intéressant expliquant le choix d'une solution middleware appropriée pour cette tâche. Donc, comme ils le disent dans le discours d'ouverture, « Pas de JMS, pas de physique des particules ! »
Surveillance
Dans l'exemple précédent, nous avons créé le canal de données entre deux services. C'est un point de défaillance potentiel supplémentaire dans une architecture, nous devons donc en prendre soin. Examinons les fonctionnalités de surveillance fournies par Apache Camel. Fondamentalement, il expose des informations statistiques sur ses routes à travers les MBeans, accessibles par JMX. ActiveMQ expose les statistiques de file d'attente de la même manière.
Allumons le serveur JMX dans l'application, pour lui permettre de s'exécuter avec les options de ligne de commande :
-Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel
Exécutez maintenant l'application afin que la route ait fait son travail. Ouvrez l'outil standard jconsole
et connectez-vous au processus de candidature. Connectez-vous à l'URL service:jmx:rmi:///jndi/rmi://localhost:1099/camel
. Accédez au domaine org.apache.camel dans l'arborescence MBeans.
Nous pouvons voir que tout ce qui concerne le routage est sous contrôle. Nous avons le nombre de messages en cours, le nombre d'erreurs et le nombre de messages dans les files d'attente. Ces informations peuvent être transmises à certains outils de surveillance dotés de fonctionnalités riches telles que Graphana ou Kibana. Vous pouvez le faire en implémentant la pile ELK bien connue.
Il existe également une console Web enfichable et extensible qui fournit une interface utilisateur pour gérer Camel, ActiveMQ et bien d'autres, appelée hawt.io.
Itinéraires de test
Apache Camel a des fonctionnalités assez larges pour écrire des itinéraires de test avec des composants fictifs. C'est un outil puissant, mais écrire des routes séparées uniquement pour les tests est un processus qui prend du temps. Il serait plus efficace de faire des tests sur les routes de production sans modifier leur pipeline. Camel possède cette fonctionnalité et peut être implémenté à l'aide du composant AdviceWith.
Activons la logique de test dans notre exemple et exécutons un exemple de test.
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>
La classe de test est :
public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }
Exécutez maintenant des tests pour l'application avec mvn test
. Nous pouvons voir que notre itinéraire a été exécuté avec succès avec les conseils de test. Aucun message n'est passé par les files d'attente réelles et les tests ont été réussis.
INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied
Utiliser Apache Camel avec le cluster Kubernetes
L'un des problèmes d'intégration aujourd'hui est que les applications ne sont plus statiques. Dans une infrastructure cloud, nous traitons des services virtuels qui s'exécutent sur plusieurs nœuds en même temps. Il permet l'architecture de microservices avec un réseau de petits services légers interagissant entre eux. Ces services ont une durée de vie peu fiable et nous devons les découvrir de manière dynamique.
Le collage de services cloud est une tâche qui peut être résolue avec Apache Camel. C'est particulièrement intéressant en raison de la saveur EIP et du fait que Camel dispose de nombreux adaptateurs et prend en charge une large gamme de protocoles. La récente version 2.18 ajoute le composant ServiceCall, qui introduit une fonctionnalité d'appel d'une API et de résolution de son adresse via des mécanismes de découverte de cluster. Actuellement, il prend en charge Consul, Kubernetes, Ribbon, etc. Quelques exemples de code, où ServiceCall est configuré avec Consul, peuvent être trouvés facilement. Nous utiliserons Kubernetes ici car c'est ma solution de clustering préférée.
Le schéma d'intégration sera le suivant :
Le service de Order
et le service d' Inventory
seront deux applications Spring Boot triviales renvoyant des données statiques. Nous ne sommes pas liés à une pile technologique particulière ici. Ces services produisent les données que nous voulons traiter.
Contrôleur de service de commande :
@RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }
Il produit des données au format :
[{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]
Le contrôleur du service d' Inventory
est absolument similaire à celui du service de Order
:
@RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }
InventoryStorage
est un référentiel générique qui contient des données. Dans cet exemple, il renvoie des objets prédéfinis statiques, qui sont marshalés au format suivant.
[{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]
Écrivons une route de passerelle les connectant, mais sans ServiceCall à cette étape :
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);
Imaginez maintenant que chaque service n'est plus une instance spécifique mais un nuage d'instances fonctionnant comme une seule. Nous utiliserons Minikube pour essayer le cluster Kubernetes localement.
Configurez les routes réseau pour voir les nœuds Kubernetes localement (l'exemple donné est pour un environnement Mac/Linux) :
# remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5
Enveloppez les services dans des conteneurs Docker avec une configuration Dockerfile comme celle-ci :
FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar
Créez et transférez les images de service dans le registre Docker. Exécutez maintenant les nœuds dans le cluster Kubernetes local.
Configuration du déploiement Kubernetes.yaml :
apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081
Exposez ces déploiements en tant que services dans le cluster :
kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort
Nous pouvons maintenant vérifier si les requêtes sont servies par des nœuds choisis au hasard dans le cluster. Exécutez curl -X http://192.168.99.100:30517/info
plusieurs fois de manière séquentielle pour accéder à minikube NodePort pour le service exposé (en utilisant votre hôte et votre port). Dans la sortie, nous constatons que nous avons atteint l'équilibrage des demandes.
Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4
Ajoutez camel-kubernetes
et camel-netty4-http
au pom.xml
du projet. Configurez ensuite le composant ServiceCall pour utiliser la découverte de nœud maître Kubernetes partagée pour tous les appels de service parmi les définitions de route :
KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);
L'EIP ServiceCall complète bien Spring Boot. La plupart des options peuvent être configurées directement dans le fichier application.properties
.
Renforcez la route Camel avec le composant ServiceCall :
rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);
Nous avons également activé Circuit Breaker dans l'itinéraire. Il s'agit d'un crochet d'intégration qui permet de suspendre les appels système distants en cas d'erreur de livraison ou d'indisponibilité du destinataire. Ceci est conçu pour éviter une défaillance du système en cascade. Le composant Hystrix permet d'atteindre cet objectif en implémentant le modèle Circuit Breaker.
Exécutons-le et envoyons une demande de test ; nous obtiendrons la réponse agrégée des deux services.
[{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]
Le résultat est comme prévu.
Autres cas d'utilisation
J'ai montré comment Apache Camel peut intégrer des microservices dans un cluster. Quelles sont les autres utilisations de ce framework ? En général, il est utile partout où le routage basé sur des règles peut être une solution. For instance, Apache Camel can be a middleware for the Internet of Things with the Eclipse Kura adapter. It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.
Conclusion
You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.
To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:
- Is there a separate integration layer?
- Are there tests for integration?
- Do we know the expected peak data intensity?
- Do we know the expected data delivery time?
- Does message correlation matter? What if a sequence breaks?
- Should we do it in a synchronous or asynchronous way?
- Where do formats and routing rules change more frequently?
- Do we have ways to monitor the process?
In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.
If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.