Usprawnij integrację oprogramowania: samouczek Apache Camel

Opublikowany: 2022-03-11

Oprogramowanie rzadko, jeśli w ogóle, istnieje w informacyjnej próżni. Przynajmniej takie jest założenie, które my inżynierowie oprogramowania możemy przyjąć dla większości tworzonych przez nas aplikacji.

W każdej skali każde oprogramowanie – w taki czy inny sposób – komunikuje się z innym oprogramowaniem z różnych powodów: aby skądś uzyskać dane referencyjne, aby wysyłać sygnały monitorujące, aby być w kontakcie z innymi usługami będąc częścią rozproszonego system i nie tylko.

W tym samouczku dowiesz się, jakie są niektóre z największych wyzwań związanych z integracją dużego oprogramowania i jak Apache Camel z łatwością je rozwiązuje.

Problem: projektowanie architektury do integracji systemów

Być może zrobiłeś co najmniej raz w życiu inżyniera oprogramowania:

  • Zidentyfikuj fragment logiki biznesowej, który powinien zainicjować wysyłanie danych.
  • W tej samej warstwie aplikacji zapisz przekształcenia danych zgodnie z oczekiwaniami odbiorcy.
  • Owiń dane w strukturę odpowiednią do przesyłania i routingu w sieci.
  • Otwórz połączenie z aplikacją docelową za pomocą odpowiedniego sterownika lub klienckiego pakietu SDK.
  • Wyślij dane i obsłuż odpowiedź.

Dlaczego jest to zła linia postępowania?

Chociaż masz tylko kilka tego rodzaju połączeń, pozostaje to możliwe do opanowania. Wraz z rosnącą liczbą relacji między systemami, logika biznesowa aplikacji miesza się z logiką integracji, która polega na adaptacji danych, skompensowaniu różnic technologicznych między dwoma systemami i przeniesieniu danych do systemu zewnętrznego za pomocą SOAP, REST lub bardziej egzotycznych żądań .

Gdybyś integrował kilka aplikacji, byłoby niezwykle trudno odtworzyć cały obraz zależności w takim kodzie: Gdzie są wytwarzane dane i jakie usługi je konsumują? Na dodatek będziesz mieć wiele miejsc, w których logika integracji jest zduplikowana.

Przy takim podejściu, mimo że zadanie jest wykonane technicznie, mamy ogromne problemy z konserwowalnością i skalowalnością integracji. Szybka reorganizacja przepływów danych w tym systemie jest prawie niemożliwa, nie wspominając o głębszych kwestiach, takich jak brak monitoringu, przerwy w obwodzie, pracochłonne odzyskiwanie danych itp.

To wszystko jest szczególnie ważne przy integracji oprogramowania w ramach dość dużego przedsiębiorstwa. Zajmowanie się integracją przedsiębiorstwa oznacza pracę z zestawem aplikacji, które działają na szerokiej gamie platform i znajdują się w różnych lokalizacjach. Wymiana danych w takim środowisku oprogramowania jest dość wymagająca. Musi spełniać wysokie standardy bezpieczeństwa w branży i zapewniać niezawodny sposób przesyłania danych. W środowisku korporacyjnym integracja systemów wymaga osobnego, dokładnie opracowanego projektu architektury.

Ten artykuł wprowadzi Cię w unikalne trudności napotykane w integracji oprogramowania, a także przedstawi niektóre oparte na doświadczeniu rozwiązania dla zadań integracyjnych. Zapoznamy się z Apache Camel, użytecznym frameworkiem, który może złagodzić najgorsze bóle głowy programistów integracyjnych. Podążymy za przykładem, jak Camel może pomóc w nawiązaniu komunikacji w klastrze mikrousług obsługiwanych przez Kubernetes.

Trudności w integracji

Szeroko stosowanym podejściem do rozwiązania tego problemu jest oddzielenie warstwy integracji w aplikacji. Może istnieć w tej samej aplikacji lub jako niezależnie działające dedykowane oprogramowanie — w tym drugim przypadku zwane oprogramowaniem pośredniczącym.

Jakie problemy zazwyczaj napotykasz podczas tworzenia i obsługi oprogramowania pośredniczącego? Ogólnie rzecz biorąc, masz następujące kluczowe elementy:

  • Wszystkie kanały danych są do pewnego stopnia zawodne. Problemy wynikające z tej zawodności mogą nie wystąpić, gdy intensywność danych jest niska lub umiarkowana. Każdy poziom przechowywania, od pamięci aplikacji po niższe pamięci podręczne i znajdujące się pod nim urządzenia, może ulec awarii. Niektóre rzadkie błędy pojawiają się tylko przy ogromnych ilościach danych. Nawet dojrzałe, gotowe do produkcji produkty dostawców mają nierozwiązane problemy ze śledzeniem błędów związane z utratą danych. System oprogramowania pośredniczącego powinien być w stanie na czas informować użytkownika o tych stratach danych i ponownie dostarczać wiadomości.
  • Aplikacje korzystają z różnych protokołów i formatów danych. Oznacza to, że system integracyjny jest kurtyną dla transformacji danych i adapterów dla innych uczestników i wykorzystuje różnorodne technologie. Mogą one obejmować zwykłe wywołania interfejsu API REST, ale mogą również dotyczyć dostępu do brokera kolejek, wysyłania zamówień CSV przez FTP lub zbiorczego pobierania danych do tabeli bazy danych. To długa lista i nigdy nie będzie krótsza.
  • Zmiany w formatach danych i regułach routingu są nieuniknione. Każdy krok w procesie tworzenia aplikacji, który zmienia strukturę danych, zwykle prowadzi do zmian w integracyjnych formatach danych i przekształceniach. Czasami konieczne są zmiany infrastruktury z reorganizacją przepływów danych przedsiębiorstwa. Na przykład zmiany te mogą wystąpić podczas wprowadzania pojedynczego punktu walidacji danych referencyjnych, który musi przetwarzać wszystkie wpisy danych podstawowych w całej firmie. W przypadku N systemów możemy w efekcie mieć maksymalnie prawie N^2 połączeń między nimi, więc liczba miejsc, w których należy wprowadzić zmiany, rośnie dość szybko. Będzie jak lawina. Aby utrzymać łatwość konserwacji, warstwa oprogramowania pośredniego musi zapewniać jasny obraz zależności z wszechstronnym routingiem i transformacją danych.

O tych pomysłach należy pamiętać przy projektowaniu integracji i wyborze najodpowiedniejszego rozwiązania oprogramowania pośredniczącego. Jednym z możliwych sposobów radzenia sobie z tym jest wykorzystanie magistrali usług przedsiębiorstwa (ESB). Ale ESB dostarczane przez głównych dostawców są generalnie zbyt ciężkie i często sprawiają więcej kłopotów, niż są warte: Szybki start z ESB jest prawie niemożliwy, ma dość stromą krzywą uczenia się, a jej elastyczność jest poświęcona długiej liście funkcji i wbudowanych narzędzi. Moim zdaniem lekkie rozwiązania integracyjne typu open source są znacznie lepsze — są bardziej elastyczne, łatwe do wdrożenia w chmurze i łatwe do skalowania.

Integracja oprogramowania nie jest łatwa do zrobienia. Dzisiaj, kiedy budujemy architektury mikroserwisów i zajmujemy się rojami małych usług, mamy również wysokie oczekiwania co do tego, jak skutecznie powinny się komunikować.

Wzorce integracji przedsiębiorstw

Jak można się spodziewać, podobnie jak w przypadku tworzenia oprogramowania, rozwój routingu i transformacji danych obejmuje powtarzalne operacje. Doświadczenia w tym zakresie zostały podsumowane i usystematyzowane przez profesjonalistów, którzy od dłuższego czasu zajmują się problemami integracyjnymi. W rezultacie istnieje zestaw wyodrębnionych szablonów zwanych wzorcami integracji przedsiębiorstwa, używanych do projektowania przepływów danych. Te metody integracji zostały opisane w książce o tym samym tytule, którą napisali Gregor Hophe i Bobby Wolfe, która jest bardzo podobna do znaczącej książki Gang of Four, ale dotyczy oprogramowania do klejenia.

Aby podać przykład, wzorzec normalizatora wprowadza składnik, który mapuje semantycznie równe wiadomości, które mają różne formaty danych, na pojedynczy model kanoniczny lub agregatorem jest EIP, który łączy sekwencję wiadomości w jeden.

Ponieważ są one ustalonymi abstrakcjami niezależnymi od technologii, używanymi do rozwiązywania problemów architektonicznych, EIP pomagają w pisaniu projektu architektury, który nie zagłębia się w poziom kodu, ale opisuje przepływy danych z wystarczającą szczegółowością. Taka notacja opisująca ścieżki integracji nie tylko sprawia, że ​​projekt jest zwięzły, ale także wyznacza wspólną nomenklaturę i wspólny język, które są bardzo ważne w kontekście rozwiązywania zadania integracyjnego z członkami zespołu z różnych obszarów biznesowych.

Przedstawiamy Apache Camel

Kilka lat temu budowałem integrację przedsiębiorstwa w ogromnej sieci sklepów spożywczych ze sklepami w bardzo rozproszonych lokalizacjach. Zacząłem od autorskiego rozwiązania ESB, które okazało się zbyt kłopotliwe w utrzymaniu. Następnie nasz zespół natrafił na Apache Camel i po wykonaniu pewnych prac związanych z „weryfikacją koncepcji”, szybko przepisaliśmy wszystkie nasze przepływy danych w trasach Camel.

Apache Camel można określić mianem „routera mediacyjnego”, zorientowanego na komunikaty frameworka pośredniczącego implementującego listę EIP, z którym się zapoznałem. Wykorzystuje te wzorce, obsługuje wszystkie popularne protokoły transportowe i zawiera szeroki zestaw przydatnych adapterów. Camel umożliwia obsługę wielu procedur integracyjnych bez konieczności pisania własnego kodu.

Oprócz tego wyróżniłbym następujące cechy Apache Camel:

  • Trasy integracji są zapisywane jako potoki złożone z bloków. Tworzy całkowicie przejrzysty obraz, który pomaga śledzić przepływy danych.
  • Camel posiada adaptery dla wielu popularnych API. Na przykład pozyskiwanie danych z Apache Kafka, monitorowanie instancji AWS EC2, integracja z Salesforce – wszystkie te zadania można rozwiązać przy użyciu gotowych komponentów.

Trasy Apache Camel można pisać w Javie lub Scala DSL. (Konfiguracja XML jest również dostępna, ale staje się zbyt szczegółowa i ma gorsze możliwości debugowania.) Nie nakłada ograniczeń na stos technologiczny usług komunikacyjnych, ale jeśli piszesz w Javie lub Scali, możesz zamiast tego osadzić Camel w aplikacji samodzielnego uruchamiania.

Notację routingu używaną przez Camel można opisać następującym prostym pseudokodem:

 from(Source) .transform(Transformer) .to(Destination)

Source , Transformer i Destination to punkty końcowe odwołujące się do składników implementacji przez ich identyfikatory URI.

Co umożliwia Camelowi rozwiązanie problemów integracyjnych, które opisałem wcześniej? Spójrzmy. Po pierwsze, logika routingu i transformacji działa teraz tylko w dedykowanej konfiguracji Apache Camel. Po drugie, poprzez zwięzłe i naturalne łącze DSL w połączeniu z wykorzystaniem EIP, pojawia się obraz zależności między systemami. Składa się ze zrozumiałych abstrakcji, a logikę routingu można łatwo dostosować. I wreszcie, nie musimy pisać stosów kodu transformacji, ponieważ odpowiednie adaptery prawdopodobnie zostaną już dołączone.

Integracje

Powinienem dodać, że Apache Camel jest dojrzałym frameworkiem i otrzymuje regularne aktualizacje. Ma świetną społeczność i znaczną skumulowaną bazę wiedzy.

Ma swoje wady. Camel nie powinien być traktowany jako złożony pakiet integracyjny. Jest to zestaw narzędzi bez funkcji wysokiego poziomu, takich jak narzędzia do zarządzania procesami biznesowymi czy monitory aktywności, ale można go wykorzystać do tworzenia takiego oprogramowania.

Alternatywnymi systemami mogą być np. Spring Integration czy Mule ESB. Z mojego doświadczenia wynika, że ​​w przypadku Spring Integration, choć uważa się go za lekki, składanie go razem i pisanie wielu plików konfiguracyjnych XML może okazać się nieoczekiwanie skomplikowane i nie jest łatwym wyjściem. Mule ESB to solidny i bardzo funkcjonalny zestaw narzędzi, ale jak sama nazwa wskazuje, jest to magistrala usług dla przedsiębiorstw, więc należy do innej kategorii wagowej. Mule można porównać z Fuse ESB, podobnym produktem opartym na Apache Camel z bogatym zestawem funkcji. Dla mnie używanie Apache Camel do usług klejenia jest dziś oczywiste. Jest łatwy w użyciu i zapewnia przejrzysty opis tego, gdzie trafia, a jednocześnie jest wystarczająco funkcjonalny, aby tworzyć złożone integracje.

Pisanie przykładowej trasy

Zacznijmy pisać kod. Zaczniemy od synchronicznego przepływu danych, który kieruje wiadomości z jednego źródła do listy adresatów. Reguły routingu zostaną napisane w Java DSL.

Użyjemy Mavena do zbudowania projektu. Najpierw dodaj następującą zależność do pom.xml :

 <dependencies> ... <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.20.0</version> </dependency> </dependencies>

Alternatywnie, aplikację można zbudować na camel-archetype-java .

Definicje tras Camel są deklarowane w metodzie RouteBuilder.configure .

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("direct:bar") .when().simple("${body.type} == 'Dessert'") .to("direct:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("direct:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("direct:coldMealStation") .otherwise() .to("direct:others"); from("direct:bar").routeId("bar").log("Handling Drink"); from("direct:dessertStation").routeId("dessertStation").log("Handling Dessert"); from("direct:hotMealStation").routeId("hotMealStation").log("Handling Hot Meal"); from("direct:coldMealStation").routeId("coldMealStation").log("Handling Cold Meal"); from("direct:others").routeId("others").log("Handling Something Other"); }

W tej definicji tworzymy trasę, która pobiera rekordy z pliku JSON, dzieli je na elementy i kieruje do zestawu funkcji obsługi na podstawie zawartości wiadomości.

Uruchommy to na przygotowanych danych testowych. Otrzymamy wynik:

 INFO | Total 6 routes, of which 6 are started INFO | Apache Camel 2.20.0 (CamelContext: camel-1) started in 10.716 seconds INFO | Incoming File: order1.json INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Drink', name='Americano', qty='1'}] INFO | Handling Drink INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='French Omelette', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Lasagna', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Hot Meal', name='Rice Balls', qty='1'}] INFO | Handling Hot Meal INFO | Exchange[ExchangePattern: InOnly, BodyType: com.antongoncharov.camel.example.model.OrderItem, Body: OrderItem{, type='Dessert', name='Blueberry Pie', qty='1'}] INFO | Handling Dessert

Zgodnie z oczekiwaniami Camel kierował wiadomości do miejsc docelowych.

Opcje transferu danych

W powyższym przykładzie interakcja między komponentami jest synchroniczna i odbywa się za pośrednictwem pamięci aplikacji. Istnieje jednak wiele innych sposobów komunikacji, gdy mamy do czynienia z oddzielnymi aplikacjami, które nie współdzielą pamięci:

  • Wymiana plików. Jedna aplikacja tworzy pliki współdzielonych danych, które druga może wykorzystać. Tam mieszka duch starej szkoły. Ta metoda komunikacji ma wiele konsekwencji: brak transakcji i spójności, słaba wydajność i izolowana koordynacja między systemami. Wielu programistów skończyło na pisaniu domowych rozwiązań integracyjnych, aby proces był łatwiejszy w zarządzaniu.
  • Wspólna baza danych. Niech aplikacje przechowują dane, które chcą udostępnić, we wspólnym schemacie pojedynczej bazy danych. Projektowanie ujednoliconego schematu i obsługa równoczesnego dostępu do tabel to najważniejsze wyzwania tego podejścia. Podobnie jak w przypadku wymiany plików, łatwo jest stać się trwałym wąskim gardłem.
  • Zdalne wywołanie API. Zapewnij interfejs, aby umożliwić aplikacji interakcję z inną uruchomioną aplikacją, jak typowe wywołanie metody. Aplikacje współdzielą funkcje za pośrednictwem wywołań API, ale to ściśle łączy je w procesie.
  • Wiadomości. Połącz każdą aplikację ze wspólnym systemem przesyłania wiadomości i wymieniaj dane oraz wywołuj zachowanie asynchronicznie za pomocą komunikatów. Ani nadawca, ani odbiorca nie muszą być jednocześnie uruchomieni, aby wiadomość została dostarczona.

Istnieje więcej sposobów interakcji, ale powinniśmy pamiętać, że ogólnie mówiąc, istnieją dwa rodzaje interakcji: synchroniczna i asynchroniczna. Pierwszy przypomina wywołanie funkcji w kodzie — przepływ wykonania będzie czekał, aż zostanie wykonana i zwróci wartość. W podejściu asynchronicznym te same dane są wysyłane za pośrednictwem pośredniej kolejki komunikatów lub tematu subskrypcji. Asynchroniczne zdalne wywołanie funkcji można zaimplementować jako EIP żądanie-odpowiedź.

Jednak wiadomości asynchroniczne nie są panaceum; wiąże się z pewnymi ograniczeniami. Rzadko widzisz interfejsy API do przesyłania wiadomości w Internecie; Synchroniczne usługi REST są znacznie bardziej popularne. Jednak oprogramowanie pośredniczące do przesyłania wiadomości jest szeroko stosowane w intranecie przedsiębiorstwa lub infrastrukturze zaplecza systemu rozproszonego.

Korzystanie z kolejek wiadomości

Zróbmy nasz przykład asynchroniczny. System oprogramowania, który zarządza kolejkami i tematami subskrypcji, nazywany jest brokerem komunikatów. To jak RDBMS dla tabel i kolumn. Kolejki służą jako integracja punkt-punkt, podczas gdy tematy służą do komunikacji publikuj-subskrybuj z wieloma odbiorcami. Użyjemy Apache ActiveMQ jako brokera komunikatów JMS, ponieważ jest solidny i możliwy do osadzenia.

Dodaj następującą zależność. Czasami dodanie activemq-all , która zawiera wszystkie pliki jar ActiveMQ do projektu, jest przesadą, ale zachowamy nieskomplikowane zależności naszej aplikacji.

 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>

Następnie uruchom broker programowo. W Spring Boot otrzymujemy do tego autokonfigurację, podłączając zależność spring-boot-starter-activemq Maven.

Uruchom nowego brokera komunikatów za pomocą następujących poleceń, określając tylko punkt końcowy łącznika:

 BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start();

I dodaj następujący fragment konfiguracji do treści metody configure :

 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory));

Teraz możemy zaktualizować poprzedni przykład za pomocą kolejek wiadomości. Kolejki zostaną utworzone automatycznie po dostarczeniu wiadomości.

 public void configure() { errorHandler(defaultErrorHandler().maximumRedeliveries(0)); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); this.getContext().addComponent("activemq", ActiveMQComponent.jmsComponent(connectionFactory)); from("file:orders?noop=true").routeId("main") .log("Incoming File: ${file:onlyname}") .unmarshal().json(JsonLibrary.Jackson, Order.class) // unmarshal JSON to Order class containing List<OrderItem> .split().simple("body.items") // split list to process one by one .to("log:inputOrderItem") .choice() .when().simple("${body.type} == 'Drink'") .to("activemq:queue:bar") .when().simple("${body.type} == 'Dessert'") .to("activemq:queue:dessertStation") .when().simple("${body.type} == 'Hot Meal'") .to("activemq:queue:hotMealStation") .when().simple("${body.type} == 'Cold Meal'") .to("activemq:queue:coldMealStation") .otherwise() .to("activemq:queue:others"); from("activemq:queue:bar").routeId("barAsync").log("Drinks"); from("activemq:queue:dessertStation").routeId("dessertAsync").log("Dessert"); from("activemq:queue:hotMealStation").routeId("hotMealAsync").log("Hot Meals"); from("activemq:queue:coldMealStation").routeId("coldMealAsync").log("Cold Meals"); from("activemq:queue:others").routeId("othersAsync").log("Others"); }

W porządku, teraz interakcja stała się asynchroniczna. Potencjalni konsumenci tych danych mogą uzyskać do nich dostęp, gdy będą na to gotowi. To przykład luźnego sprzężenia, które staramy się osiągnąć w architekturze reaktywnej. Niedostępność jednej z usług nie blokuje pozostałych. Ponadto konsument może równolegle skalować i czytać z kolejki. Sama kolejka może być skalowana i podzielona na partycje. Trwałe kolejki mogą przechowywać dane na dysku, czekając na przetworzenie, nawet gdy wszyscy uczestnicy zejdą. W konsekwencji system ten jest bardziej odporny na błędy.

Zadziwiającym faktem jest to, że CERN wykorzystuje Apache Camel i ActiveMQ do monitorowania systemów Wielkiego Zderzacza Hadronów (LHC). Jest też ciekawa praca magisterska wyjaśniająca wybór odpowiedniego rozwiązania middleware do tego zadania. Tak więc, jak mówią w przemówieniu: „Bez JMS — bez fizyki cząstek!”

Monitorowanie

W poprzednim przykładzie utworzyliśmy kanał danych między dwiema usługami. Jest to dodatkowy potencjalny punkt awarii w architekturze, więc musimy o niego dbać. Przyjrzyjmy się, jakie funkcje monitorowania zapewnia Apache Camel. Zasadniczo udostępnia informacje statystyczne o swoich trasach za pośrednictwem komponentów MBean, do których dostęp zapewnia JMX. ActiveMQ udostępnia statystyki kolejki w ten sam sposób.

Włączmy serwer JMX w aplikacji, aby umożliwić jego uruchamianie z opcjami wiersza poleceń:

 -Dorg.apache.camel.jmx.createRmiConnector=true -Dorg.apache.camel.jmx.mbeanObjectDomainName=org.apache.camel -Dorg.apache.camel.jmx.rmiConnector.registryPort=1099 -Dorg.apache.camel.jmx.serviceUrlPath=camel

Teraz uruchom aplikację, aby trasa wykonała swoje zadanie. Otwórz standardowe narzędzie jconsole i połącz się z procesem aplikacji. Połącz się z service:jmx:rmi:///jndi/rmi://localhost:1099/camel . Przejdź do domeny org.apache.camel w drzewie MBeans.

Zrzut ekranu 1

Widzimy, że wszystko w routingu jest pod kontrolą. Mamy liczbę wiadomości w locie, liczbę błędów i liczbę wiadomości w kolejkach. Informacje te mogą być przesyłane potokowo do niektórych zestawów narzędzi do monitorowania z bogatą funkcjonalnością, takich jak Graphana lub Kibana. Możesz to zrobić, wdrażając dobrze znany stos ELK.

Dostępna jest również podłączana i rozszerzalna konsola internetowa, która zapewnia interfejs użytkownika do zarządzania Camel, ActiveMQ i wieloma innymi, o nazwie hawt.io.

Zrzut ekranu 2

Testowanie tras

Apache Camel ma dosyć szeroką funkcjonalność do pisania tras testowych z komponentami makiety. To potężne narzędzie, ale pisanie oddzielnych tras tylko do testowania to czasochłonny proces. Wydajniejsze byłoby przeprowadzanie testów na trasach produkcyjnych bez modyfikowania ich potoku. Camel ma tę funkcję i może być zaimplementowany za pomocą komponentu AdviceWith.

Włączmy logikę testów w naszym przykładzie i uruchommy przykładowy test.

 <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test</artifactId> <version>2.20.0</version> <scope>test</scope> </dependency>

Klasa testowa to:

 public class AsyncRouteTest extends CamelTestSupport { @Override protected RouteBuilder createRouteBuilder() throws Exception { return new AsyncRouteBuilder(); } @Before public void mockEndpoints() throws Exception { context.getRouteDefinition("main").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { // we substitute all actual queues with mock endpoints mockEndpointsAndSkip("activemq:queue:bar"); mockEndpointsAndSkip("activemq:queue:dessertStation"); mockEndpointsAndSkip("activemq:queue:hotMealStation"); mockEndpointsAndSkip("activemq:queue:coldMealStation"); mockEndpointsAndSkip("activemq:queue:others"); // and replace the route's source with test endpoint replaceFromWith("file://testInbox"); } }); } @Test public void testSyncInteraction() throws InterruptedException { String testJson = "{\"id\": 1, \"order\": [{\"id\": 1, \"name\": \"Americano\", \"type\": \"Drink\", \"qty\": \"1\"}, {\"id\": 2, \"name\": \"French Omelette\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 3, \"name\": \"Lasagna\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 4, \"name\": \"Rice Balls\", \"type\": \"Hot Meal\", \"qty\": \"1\"}, {\"id\": 5, \"name\": \"Blueberry Pie\", \"type\": \"Dessert\", \"qty\": \"1\"}]}"; // get mocked endpoint and set an expectation MockEndpoint mockEndpoint = getMockEndpoint("mock:activemq:queue:hotMealStation"); mockEndpoint.expectedMessageCount(3); // simulate putting file in the inbox folder template.sendBodyAndHeader("file://testInbox", testJson, Exchange.FILE_NAME, "test.json"); //checks that expectations were met assertMockEndpointsSatisfied(); } }

Teraz uruchom testy aplikacji za pomocą mvn test . Widzimy, że nasza trasa została pomyślnie wykonana dzięki poradom testowym. Żadne wiadomości nie przeszły przez rzeczywiste kolejki, a testy zostały zakończone.

 INFO | Route: main started and consuming from: file://testInbox <...> INFO | Incoming File: test.json <...> INFO | Asserting: mock://activemq:queue:hotMealStation is satisfied

Używanie Apache Camel z klastrem Kubernetes

Jednym z problemów związanych z integracją jest to, że aplikacje nie są już statyczne. W infrastrukturze chmurowej mamy do czynienia z usługami wirtualnymi, które działają na wielu węzłach jednocześnie. Umożliwia architekturze mikrousług z siecią małych, lekkich usług współdziałających między sobą. Usługi te mają zawodną żywotność i musimy je dynamicznie odkrywać.

Łączenie usług w chmurze to zadanie, które można rozwiązać za pomocą Apache Camel. Jest to szczególnie interesujące ze względu na smak EIP oraz fakt, że Camel ma wiele adapterów i obsługuje szeroką gamę protokołów. Najnowsza wersja 2.18 dodaje komponent ServiceCall, który wprowadza funkcję wywoływania API i rozwiązywania jego adresu za pomocą mechanizmów wykrywania klastrów. Obecnie obsługuje Consul, Kubernetes, Ribbon itp. Niektóre przykłady kodu, w których ServiceCall jest skonfigurowany z Consul, można łatwo znaleźć. Będziemy tutaj używać Kubernetes, ponieważ jest to moje ulubione rozwiązanie do klastrowania.

Schemat integracji będzie wyglądał następująco:

Schemat

Usługa Order i usługa Inventory będą kilkoma trywialnymi aplikacjami Spring Boot zwracającymi dane statyczne. Nie jesteśmy tutaj przywiązani do konkretnego stosu technologii. Usługi te wytwarzają dane, które chcemy przetwarzać.

Zamówienie kontrolera usługi:

 @RestController public class OrderController { private final OrderStorage orderStorage; @Autowired public OrderController(OrderStorage orderStorage) { this.orderStorage = orderStorage; } @RequestMapping("/info") public String info() { return "Order Service UU/orders") public List<Order> getAll() { return orderStorage.getAll(); } @RequestMapping("/orders/{id}") public Order getOne(@PathVariable Integer id) { return orderStorage.getOne(id); } }

Produkuje dane w formacie:

 [{"id":1,"items":[2,3,4]},{"id":2,"items":[5,3]}]

Kontroler usługi Inventory jest absolutnie podobny do usługi Order :

 @RestController public class InventoryController { private final InventoryStorage inventoryStorage; @Autowired public InventoryController(InventoryStorage inventoryStorage) { this.inventoryStorage = inventoryStorage; } @RequestMapping("/info") public String info() { return "Inventory Service UU/items") public List<InventoryItem> getAll() { return inventoryStorage.getAll(); } @RequestMapping("/items/{id}") public InventoryItem getOne(@PathVariable Integer id) { return inventoryStorage.getOne(id); } }

InventoryStorage to ogólne repozytorium przechowujące dane. W tym przykładzie zwraca statyczne, predefiniowane obiekty, które są zorganizowane w następującym formacie.

 [{"id":1,"name":"Laptop","description":"Up to 12-hours battery life","price":499.9},{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0},{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5}]

Napiszmy trasę bramy łączącą je, ale bez ServiceCall na tym etapie:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8082/orders?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .to("http4://localhost:8081/items?bridgeEndpoint=true") .unmarshal(formatInventory);

Teraz wyobraź sobie, że każda usługa nie jest już konkretną instancją, ale chmurą instancji działającą jako jedna. Użyjemy Minikube, aby wypróbować klaster Kubernetes lokalnie.

Skonfiguruj trasy sieciowe, aby zobaczyć lokalnie węzły Kubernetes (podany przykład dotyczy środowiska Mac/Linux):

 # remove existing routes sudo route -n delete 10/24 > /dev/null 2>&1 # add routes sudo route -n add 10.0.0.0/24 $(minikube ip) # 172.17.0.0/16 ip range is used by docker in minikube sudo route -n add 172.17.0.0/16 $(minikube ip) ifconfig 'bridge100' | grep member | awk '{print $2}' # use interface name from the output of the previous command # needed for xhyve driver, which I'm using for testing sudo ifconfig bridge100 -hostfilter en5

Opakuj usługi w kontenerach Docker z konfiguracją Dockerfile w następujący sposób:

 FROM openjdk:8-jdk-alpine VOLUME /tmp ADD target/order-srv-1.0-SNAPSHOT.jar app.jar ADD target/lib lib ENV JAVA_OPTS="" ENTRYPOINT exec java $JAVA_OPTS -Djava.security.egd=file:/dev/./urandom -jar /app.jar

Kompiluj i wypychaj obrazy usługi do rejestru platformy Docker. Teraz uruchom węzły w lokalnym klastrze Kubernetes.

Konfiguracja wdrożenia Kubernetes.yaml:

 apiVersion: extensions/v1beta1 kind: Deployment metadata: name: inventory spec: replicas: 3 selector: matchLabels: app: inventory template: metadata: labels: app: inventory spec: containers: - name: inventory image: inventory-srv:latest imagePullPolicy: Never ports: - containerPort: 8081

Udostępnij te wdrożenia jako usługi w klastrze:

 kubectl expose deployment order-srv --type=NodePort kubectl expose deployment inventory-srv --type=NodePort

Teraz możemy sprawdzić, czy żądania są obsługiwane przez losowo wybrane węzły z klastra. Uruchom curl -X http://192.168.99.100:30517/info sekwencyjnie kilka razy, aby uzyskać dostęp do minikube NodePort w celu udostępnienia usługi (przy użyciu hosta i portu). W danych wyjściowych widzimy, że osiągnęliśmy równoważenie żądań.

 Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = b7a4d326-1e76-4051-a0a6-1016394fafda Inventory Service UUID = 22f8ca6b-f56b-4984-927b-cbf9fcf81da5 Inventory Service UUID = 50323ddb-3ace-4424-820a-6b4e85775af4

Dodaj camel-kubernetes i camel-netty4-http do pom.xml projektu. Następnie skonfiguruj komponent ServiceCall tak, aby używał wykrywania węzłów głównych Kubernetes współdzielonych dla wszystkich wywołań usług w definicjach tras:

 KubernetesConfiguration kubernetesConfiguration = new KubernetesConfiguration(); kubernetesConfiguration.setMasterUrl("https://192.168.64.2:8443"); kubernetesConfiguration.setClientCertFile("/Users/antongoncharov/.minikube/client.crt"); kubernetesConfiguration.setClientKeyFile("/Users/antongoncharov/.minikube/client.key"); kubernetesConfiguration.setNamespace("default”); ServiceCallConfigurationDefinition config = new ServiceCallConfigurationDefinition(); config.setServiceDiscovery(new KubernetesClientServiceDiscovery(kubernetesConfiguration)); context.setServiceCallConfiguration(config);

ServiceCall EIP dobrze uzupełnia Spring Boot. Większość opcji można skonfigurować bezpośrednio w pliku application.properties .

Wzmocnij trasę Camel za pomocą komponentu ServiceCall:

 rest("/orders") .get("/").description("Get all orders with details").outType(TestResponse.class) .route() .hystrix() .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("customer-srv","http4:customer-deployment?bridgeEndpoint=true") .unmarshal(formatOrder) .enrich("direct:enrichFromInventory", new OrderAggregationStrategy()) .to("log:result") .endRest(); from("direct:enrichFromInventory") .transform().simple("${null}") .setHeader("Content-Type", constant("application/json")) .setHeader("Accept", constant("application/json")) .setHeader(Exchange.HTTP_METHOD, constant("GET")) .removeHeaders("CamelHttp*") .serviceCall("order-srv","http4:order-srv?bridgeEndpoint=true") .unmarshal(formatInventory);

Uruchomiliśmy również wyłącznik na trasie. Jest to hak integracyjny, który umożliwia wstrzymanie zdalnych wywołań systemowych w przypadku błędów w dostawie lub niedostępności odbiorcy. Ma to na celu uniknięcie awarii systemu kaskadowego. Komponent Hystrix pomaga to osiągnąć poprzez wdrożenie wzorca wyłącznika.

Uruchommy go i wyślijmy zapytanie testowe; otrzymamy odpowiedź zagregowaną z obu usług.

 [{"id":1,"items":[{"id":2,"name":"Monitor","description":"27-inch, response time: 7ms","price":200.0},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9},{"id":4,"name":"Mouse","description":"Designed for comfort and portability","price":19.0}]},{"id":2,"items":[{"id":5,"name":"Keyboard","description":"Layout: US","price":10.5},{"id":3,"name":"Headphones","description":"Soft leather ear-cups","price":29.9}]}]

Wynik jest zgodny z oczekiwaniami.

Inne przypadki użycia

Pokazałem, jak Apache Camel może integrować mikroserwisy w klastrze. Jakie są inne zastosowania tego frameworka? Ogólnie jest to przydatne wszędzie tam, gdzie rozwiązaniem może być routing oparty na regułach. For instance, Apache Camel can be a middleware for the Internet of Things with the Eclipse Kura adapter. It can handle monitoring by ferrying log signals from various components and services, like in the CERN system. It can also be an integration framework for enterprise SOA or be a pipeline for batch data processing, although it doesn't compete well with Apache Spark in this area.

Wniosek

You can see that systems integration isn't an easy process. We're lucky because a lot of experience has been gathered. It's important to apply it correctly to build flexible and fault-tolerant solutions.

To ensure correct application, I recommend having a checklist of important integration aspects. Must-have items include:

  • Is there a separate integration layer?
  • Are there tests for integration?
  • Do we know the expected peak data intensity?
  • Do we know the expected data delivery time?
  • Does message correlation matter? What if a sequence breaks?
  • Should we do it in a synchronous or asynchronous way?
  • Where do formats and routing rules change more frequently?
  • Do we have ways to monitor the process?

In this article, we tried Apache Camel, a lightweight integration framework, which helps save time and effort when solving integration problems. As we showed, it can serve as a tool, supporting the relevant microservice architecture by taking full responsibility for data exchange between microservices.

If you're interested in learning more about Apache Camel, I highly recommend the book “Camel in Action” by the framework's creator, Claus Ibsen. Official documentation is available at camel.apache.org.