Samouczek Spring Batch: Łatwe przetwarzanie wsadowe dzięki Spring

Opublikowany: 2022-03-11

Przetwarzanie wsadowe — charakteryzujące się masowym, nieinteraktywnym i często długotrwałym wykonywaniem w tle — jest szeroko stosowane w praktycznie każdej branży i jest stosowane do różnorodnych zadań. Przetwarzanie wsadowe może wymagać dużej ilości danych lub obliczeń, może być wykonywane sekwencyjnie lub równolegle i może być inicjowane za pomocą różnych modeli wywołań, w tym ad hoc, zaplanowanego i na żądanie.

Ten samouczek Spring Batch ogólnie wyjaśnia model programowania i język domeny aplikacji wsadowych, aw szczególności pokazuje kilka przydatnych podejść do projektowania i tworzenia aplikacji wsadowych przy użyciu aktualnej wersji Spring Batch 3.0.7 .

Co to jest partia wiosenna?

Spring Batch to lekka, kompleksowa platforma zaprojektowana w celu ułatwienia tworzenia solidnych aplikacji wsadowych. Zapewnia również bardziej zaawansowane usługi techniczne i funkcje, które obsługują wyjątkowo duże i wydajne zadania wsadowe dzięki optymalizacji i technikom partycjonowania. Spring Batch opiera się na opartym na POJO podejściu programistycznym Spring Framework, znanym wszystkim doświadczonym programistom Spring.

Na przykład w tym artykule omówiono kod źródłowy z przykładowego projektu, który ładuje plik klienta w formacie XML, filtruje klientów według różnych atrybutów i wyprowadza przefiltrowane wpisy do pliku tekstowego. Kod źródłowy naszego przykładu Spring Batch (który wykorzystuje adnotacje Lombok) jest dostępny tutaj na GitHub i wymaga Java SE 8 i Maven.

Co to jest przetwarzanie wsadowe? Kluczowe pojęcia i terminologia

Ważne jest, aby każdy programista wsadowy był zaznajomiony i zaznajomiony z głównymi koncepcjami przetwarzania wsadowego. Poniższy diagram to uproszczona wersja architektury referencyjnej partii, która została sprawdzona przez dziesięciolecia implementacji na wielu różnych platformach. Wprowadza kluczowe pojęcia i terminy dotyczące przetwarzania wsadowego, używane przez Spring Batch.

Samouczek Spring Batch: Kluczowe pojęcia i terminologia

Jak pokazano w naszym przykładzie przetwarzania wsadowego, proces wsadowy jest zwykle zawarty w Job składającym się z wielu Step . Każdy Step ma zazwyczaj jeden ItemReader , ItemProcessor i ItemWriter . Job jest wykonywane przez JobLauncher , a metadane dotyczące skonfigurowanych i wykonanych zadań są przechowywane w JobRepository .

Każde Job może być powiązane z wieloma JobInstance , z których każdy jest jednoznacznie zdefiniowany przez jego konkretne JobParameters , które są używane do uruchamiania zadania wsadowego. Każde uruchomienie JobInstance jest określane jako JobExecution . Każde JobExecution zazwyczaj śledzi to, co wydarzyło się podczas uruchomienia, takie jak status bieżący i status wyjścia, czas rozpoczęcia i zakończenia itp.

Step to niezależna, specyficzna faza Job wsadowego, tak że każde Job składa się z jednego lub więcej Step . Podobnie do Job , Step ma indywidualne StepExecution , które reprezentuje pojedynczą próbę wykonania Step . StepExecution przechowuje informacje o bieżącym i końcowym statusie, czasie rozpoczęcia i zakończenia itd., a także odniesienia do odpowiednich instancji Step i JobExecution .

ExecutionContext to zestaw par klucz-wartość zawierający informacje, których zakres StepExecution lub JobExecution . Spring Batch utrzymuje ExecutionContext , który pomaga w przypadkach, gdy chcesz ponownie uruchomić uruchomienie wsadowe (np. gdy wystąpił błąd krytyczny itp.). Wszystko, co jest potrzebne, to umieszczenie dowolnego obiektu, który ma być dzielony między etapami, w kontekście, a ramy zajmą się resztą. Po ponownym uruchomieniu wartości z poprzedniego ExecutionContext są przywracane z bazy danych i stosowane.

JobRepository to mechanizm w Spring Batch, który umożliwia to wszystko. Zapewnia operacje CRUD dla JobLauncher , Job i Step . Po uruchomieniu Job JobExecution jest pozyskiwane z repozytorium i podczas wykonywania instancje StepExecution i JobExecution są utrwalane w repozytorium.

Pierwsze kroki z Spring Batch Framework

Jedną z zalet Spring Batch jest to, że zależności projektu są minimalne, co ułatwia szybkie uruchomienie i uruchomienie. Kilka istniejących zależności jest jasno określonych i wyjaśnionych w pom.xml projektu, do którego można uzyskać dostęp tutaj.

Rzeczywiste uruchomienie aplikacji odbywa się w klasie wyglądającej mniej więcej tak:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

Adnotacja @EnableBatchProcessing włącza funkcje Spring Batch i zapewnia podstawową konfigurację do konfigurowania zadań wsadowych.

Adnotacja @SpringBootApplication pochodzi z projektu Spring Boot, który zapewnia samodzielne, gotowe do produkcji aplikacje oparte na Spring. Określa klasę konfiguracyjną, która deklaruje co najmniej jeden Spring beans, a także uruchamia autokonfigurację i skanowanie komponentów Springa.

Nasz przykładowy projekt ma tylko jedno zadanie, które jest konfigurowane przez CustomerReportJobConfig z wstrzykniętym JobBuilderFactory i StepBuilderFactory . Minimalną konfigurację zadania można zdefiniować w CustomerReportJobConfig w następujący sposób:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Istnieją dwa główne podejścia do budowania kroku.

Jedno podejście, jak pokazano w powyższym przykładzie, jest oparte na zadaniach . Tasklet obsługuje prosty interfejs, który ma tylko jedną metodę execute() , która jest wywoływana wielokrotnie, dopóki nie zwróci RepeatStatus.FINISHED lub zgłosi wyjątek sygnalizujący niepowodzenie. Każde wywołanie Tasklet jest zawarte w transakcji.

Inne podejście, przetwarzanie zorientowane na porcje, odnosi się do sekwencyjnego odczytywania danych i tworzenia „kawałków”, które zostaną zapisane w granicach transakcji. Każdy pojedynczy element jest wczytywany z ItemReader , przekazywany do ItemProcessor i agregowany. Gdy liczba odczytanych elementów jest równa interwałowi zatwierdzenia, cała porcja jest zapisywana za pośrednictwem ItemWriter , a następnie transakcja zostaje zatwierdzona. Krok zorientowany na porcje można skonfigurować w następujący sposób:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

Metoda chunk() buduje krok, który przetwarza elementy w porcjach o podanym rozmiarze, przy czym każda porcja jest następnie przekazywana do określonego czytnika, procesora i programu piszącego. Metody te omówiono bardziej szczegółowo w kolejnych sekcjach tego artykułu.

Czytnik niestandardowy

Dla naszej przykładowej aplikacji Spring Batch, w celu odczytania listy klientów z pliku XML, musimy dostarczyć implementację interfejsu org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

ItemReader dostarcza dane i oczekuje się, że będzie stanowy. Zwykle jest wywoływany wiele razy dla każdej partii, przy czym każde wywołanie read() zwraca następną wartość i ostatecznie zwraca null , gdy wszystkie dane wejściowe zostaną wyczerpane.

Spring Batch udostępnia kilka gotowych implementacji ItemReader , których można używać do różnych celów, takich jak odczytywanie kolekcji, plików, integracja JMS i JDBC, a także wielu źródeł i tak dalej.

W naszej przykładowej aplikacji klasa CustomerItemReader deleguje rzeczywiste wywołania read() do leniwie zainicjowanej instancji klasy IteratorItemReader :

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Fasolka Spring dla tej implementacji jest tworzona z adnotacjami @Component i @StepScope , informując Spring, że ta klasa jest komponentem Spring o zakresie krokowym i zostanie utworzona raz na wykonanie kroku w następujący sposób:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Procesory niestandardowe

ItemProcessors przekształcają elementy wejściowe i wprowadzają logikę biznesową w scenariuszu przetwarzania zorientowanym na elementy. Muszą zapewnić implementację interfejsu org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

Metoda process() akceptuje jedną instancję klasy I i może zwracać instancję tego samego typu lub nie. Zwrócenie null wskazuje, że element nie powinien być dalej przetwarzany. Jak zwykle, Spring udostępnia kilka standardowych procesorów, takich jak CompositeItemProcessor , który przekazuje element przez sekwencję wstrzykiwanych ItemProcessor oraz ValidatingItemProcessor , który sprawdza poprawność danych wejściowych.

W przypadku naszej przykładowej aplikacji procesory służą do filtrowania klientów według następujących wymagań:

  • Klient musi urodzić się w bieżącym miesiącu (np. aby zgłosić ofertę urodzinową itp.)
  • Klient musi mieć mniej niż pięć zakończonych transakcji (np. w celu identyfikacji nowszych klientów)

Wymóg „bieżącego miesiąca” jest implementowany za pomocą niestandardowego ItemProcessor :

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

Wymóg „ograniczonej liczby transakcji” jest zaimplementowany jako ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Ta para procesorów jest następnie hermetyzowana w CompositeItemProcessor , który implementuje wzorzec delegata:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Niestandardowi pisarze

Do wyprowadzania danych Spring Batch udostępnia interfejs org.springframework.batch.item.ItemWriter do serializacji obiektów w razie potrzeby:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

Metoda write() jest odpowiedzialna za upewnienie się, że wszystkie wewnętrzne bufory są opróżniane. Jeśli transakcja jest aktywna, zwykle konieczne będzie odrzucenie danych wyjściowych przy kolejnym wycofaniu. Zasób, do którego program zapisujący wysyła dane, zwykle powinien sam sobie z tym poradzić. Istnieją standardowe implementacje, takie jak CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter i inne.

W naszej przykładowej aplikacji lista przefiltrowanych klientów jest zapisana w następujący sposób:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Planowanie wiosennych zadań wsadowych

Domyślnie Spring Batch wykonuje wszystkie zadania, które może znaleźć (tj. skonfigurowane tak, jak w CustomerReportJobConfig ) podczas uruchamiania. Aby zmienić to zachowanie, wyłącz wykonywanie zadania podczas uruchamiania, dodając następującą właściwość do application.properties :

 spring.batch.job.enabled=false

Rzeczywiste planowanie jest następnie osiągane przez dodanie adnotacji @EnableScheduling do klasy konfiguracji i adnotacji @Scheduled do metody, która wykonuje samo zadanie. Planowanie można skonfigurować za pomocą opóźnień, stawek lub wyrażeń cron:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Z powyższym przykładem jest jednak problem. W czasie wykonywania zadanie powiedzie się tylko za pierwszym razem. Gdy uruchomi się po raz drugi (tj. po pięciu sekundach), wygeneruje w dziennikach następujące komunikaty (zauważ, że w poprzednich wersjach Spring Batch zostałby JobInstanceAlreadyCompleteException ):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Dzieje się tak, ponieważ można tworzyć i wykonywać tylko unikalne JobInstance , a Spring Batch nie ma możliwości odróżnienia pierwszego i drugiego JobInstance .

Istnieją dwa sposoby uniknięcia tego problemu podczas planowania zadania wsadowego.

Jednym z nich jest wprowadzenie jednego lub więcej unikalnych parametrów (np. rzeczywisty czas rozpoczęcia w nanosekundach) do każdego zadania:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Alternatywnie możesz uruchomić następne zadanie w sekwencji JobInstance określonej przez JobParametersIncrementer dołączony do określonego zadania za pomocą SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Testowanie jednostkowe wsadów wiosennych

Zwykle, aby uruchomić testy jednostkowe w aplikacji Spring Boot, framework musi załadować odpowiedni ApplicationContext . W tym celu wykorzystywane są dwie adnotacje:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Istnieje klasa narzędziowa org.springframework.batch.test.JobLauncherTestUtils do testowania zadań wsadowych. Zapewnia metody uruchamiania całego zadania, a także umożliwia kompleksowe testowanie poszczególnych kroków bez konieczności uruchamiania każdego kroku w zadaniu. Musi być zadeklarowana jako fasolka szparagowa:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Typowy test dla zadania i kroku wygląda następująco (może również korzystać z dowolnych frameworków do mockingu):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch wprowadza dodatkowe zakresy dla kontekstów kroków i zadań. Obiekty w tych zakresach używają kontenera Spring jako fabryki obiektów, więc istnieje tylko jedna instancja każdego takiego komponentu bean na krok wykonania lub zadanie. Ponadto zapewniona jest obsługa późnego wiązania odwołań dostępnych z StepContext lub JobContext . Komponenty, które są skonfigurowane w czasie wykonywania, aby były ograniczone do kroku lub zadania, trudno jest przetestować jako samodzielne komponenty, chyba że istnieje sposób na ustawienie kontekstu tak, jakby były w wykonaniu kroku lub zadania. Taki jest cel komponentów JobScopeTestUtils i org.springframework.batch.test.StepScopeTestUtils w Spring Batch, a także org.springframework.batch.test.StepScopeTestExecutionListener i JobScopeTestExecutionListener .

TestExecutionListeners są deklarowane na poziomie klasy, a ich zadaniem jest utworzenie kontekstu wykonania kroku dla każdej metody testowej. Na przykład:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Istnieją dwie TestExecutionListener . Jeden pochodzi ze standardowego frameworka Spring Test i obsługuje wstrzykiwanie zależności ze skonfigurowanego kontekstu aplikacji. Drugi to Spring Batch StepScopeTestExecutionListener , który konfiguruje kontekst zakresu kroku dla wstrzykiwania zależności do testów jednostkowych. StepContext jest tworzony na czas trwania metody testowej i udostępniany wszelkim zależnościom, które są wstrzykiwane. Domyślnym zachowaniem jest po prostu utworzenie StepExecution ze stałymi właściwościami. Alternatywnie StepContext może być dostarczony przez przypadek testowy jako metoda fabryki zwracająca poprawny typ.

Inne podejście opiera się na klasie narzędziowej StepScopeTestUtils . Ta klasa służy do tworzenia i manipulowania StepScope w testach jednostkowych w bardziej elastyczny sposób bez używania iniekcji zależności. Na przykład odczytanie identyfikatora klienta filtrowanego przez procesor powyżej można wykonać w następujący sposób:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Gotowy na zaawansowaną wiosenną partię?

W tym artykule przedstawiono niektóre z podstaw projektowania i tworzenia aplikacji Spring Batch. Jednak istnieje wiele bardziej zaawansowanych tematów i możliwości — takich jak skalowanie, przetwarzanie równoległe, detektory i inne — które nie zostały omówione w tym artykule. Mamy nadzieję, że ten artykuł stanowi przydatną podstawę do rozpoczęcia pracy.

Informacje na te bardziej zaawansowane tematy można znaleźć w oficjalnej dokumentacji Spring Back dla Spring Batch.