Spring Batch Tutorial: Stapelverarbeitung leicht gemacht mit Spring

Veröffentlicht: 2022-03-11

Die Batch-Verarbeitung – typisch für die massenorientierte, nicht interaktive und häufig lang andauernde Ausführung im Hintergrund – ist in praktisch allen Branchen weit verbreitet und wird auf eine Vielzahl von Aufgaben angewendet. Die Stapelverarbeitung kann daten- oder rechenintensiv sein, sequentiell oder parallel ausgeführt werden und kann durch verschiedene Aufrufmodelle initiiert werden, einschließlich Ad-hoc, geplant und auf Anfrage.

Dieses Spring Batch-Tutorial erläutert das Programmiermodell und die Domänensprache von Batch-Anwendungen im Allgemeinen und zeigt im Besonderen einige nützliche Ansätze für das Design und die Entwicklung von Batch-Anwendungen mit der aktuellen Version Spring Batch 3.0.7 .

Was ist Spring Batch?

Spring Batch ist ein leichtes, umfassendes Framework, das entwickelt wurde, um die Entwicklung robuster Batch-Anwendungen zu erleichtern. Es bietet auch fortschrittlichere technische Dienste und Funktionen, die Batch-Jobs mit extrem hohem Volumen und hoher Leistung durch seine Optimierungs- und Partitionierungstechniken unterstützen. Spring Batch baut auf dem POJO-basierten Entwicklungsansatz des Spring Framework auf, der allen erfahrenen Spring-Entwicklern vertraut ist.

Als Beispiel betrachtet dieser Artikel Quellcode aus einem Beispielprojekt, das eine Kundendatei im XML-Format lädt, Kunden nach verschiedenen Attributen filtert und die gefilterten Einträge in eine Textdatei ausgibt. Der Quellcode für unser Spring Batch-Beispiel (das Lombok-Anmerkungen verwendet) ist hier auf GitHub verfügbar und erfordert Java SE 8 und Maven.

Was ist Stapelverarbeitung? Schlüsselkonzepte und Terminologie

Es ist wichtig, dass jeder Batch-Entwickler mit den Hauptkonzepten der Batch-Verarbeitung vertraut und vertraut ist. Das folgende Diagramm ist eine vereinfachte Version der Batch-Referenzarchitektur, die sich in jahrzehntelangen Implementierungen auf vielen verschiedenen Plattformen bewährt hat. Es stellt die wichtigsten Konzepte und Begriffe vor, die für die Stapelverarbeitung relevant sind, wie sie von Spring Batch verwendet werden.

Spring Batch Tutorial: Schlüsselkonzepte und Terminologie

Wie in unserem Batch-Verarbeitungsbeispiel gezeigt, wird ein Batch-Prozess typischerweise durch einen Job gekapselt, der aus mehreren Step besteht. Jeder Step hat normalerweise einen einzelnen ItemReader , ItemProcessor und ItemWriter . Ein Job wird von einem JobLauncher ausgeführt, und Metadaten über konfigurierte und ausgeführte Jobs werden in einem JobRepository gespeichert.

Jeder Job kann mehreren JobInstance werden, von denen jede eindeutig durch ihre speziellen JobParameters definiert ist, die zum Starten eines Batch-Jobs verwendet werden. Jede Ausführung einer JobInstance wird als JobExecution bezeichnet. Jede JobExecution verfolgt in der Regel, was während einer Ausführung passiert ist, wie z. B. aktuelle und Beendigungsstatus, Start- und Endzeiten usw.

Ein Step ist eine unabhängige, spezifische Phase eines Batch- Job , sodass jeder Job aus einem oder mehreren Step besteht. Ähnlich wie ein Job hat ein Step eine individuelle StepExecution , die einen einzelnen Versuch darstellt, einen Step auszuführen. StepExecution speichert die Informationen über aktuelle und Beendigungsstatus, Start- und Endzeiten usw. sowie Verweise auf die entsprechenden Step und JobExecution Instanzen.

Ein ExecutionContext ist ein Satz von Schlüssel-Wert-Paaren, die Informationen enthalten, die entweder StepExecution oder JobExecution . Spring Batch behält den ExecutionContext bei, was in Fällen hilfreich ist, in denen Sie einen Batchlauf neu starten möchten (z. B. wenn ein schwerwiegender Fehler aufgetreten ist usw.). Alles, was benötigt wird, ist, jedes Objekt, das zwischen den Schritten geteilt werden soll, in den Kontext zu stellen, und das Framework kümmert sich um den Rest. Nach dem Neustart werden die Werte aus dem vorherigen ExecutionContext aus der Datenbank wiederhergestellt und angewendet.

JobRepository ist der Mechanismus in Spring Batch, der all diese Persistenz ermöglicht. Es stellt CRUD-Operationen für JobLauncher , Job und Step -Instanziierungen bereit. Sobald ein Job gestartet wird, wird eine JobExecution aus dem Repository abgerufen, und während der Ausführung werden StepExecution und JobExecution Instanzen im Repository gespeichert.

Erste Schritte mit Spring Batch Framework

Einer der Vorteile von Spring Batch ist, dass die Projektabhängigkeiten minimal sind, was es einfacher macht, schnell loszulegen. Die wenigen existierenden Abhängigkeiten sind in der pom.xml des Projekts, auf die hier zugegriffen werden kann, klar spezifiziert und erklärt.

Der eigentliche Start der Anwendung erfolgt in einer Klasse, die etwa wie folgt aussieht:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

Die Annotation @EnableBatchProcessing Spring Batch-Funktionen und bietet eine Basiskonfiguration zum Einrichten von Batch-Jobs.

Die Annotation @SpringBootApplication stammt aus dem Spring Boot-Projekt, das eigenständige, produktionsbereite, auf Spring basierende Anwendungen bereitstellt. Es gibt eine Konfigurationsklasse an, die eine oder mehrere Spring-Beans deklariert und auch die automatische Konfiguration und das Scannen von Spring-Komponenten auslöst.

Unser Beispielprojekt hat nur einen Job, der von CustomerReportJobConfig mit einer JobBuilderFactory und StepBuilderFactory konfiguriert wird. Die minimale Jobkonfiguration kann in CustomerReportJobConfig wie folgt definiert werden:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Es gibt zwei Hauptansätze zum Erstellen einer Stufe.

Ein Ansatz, wie im obigen Beispiel gezeigt, ist Tasklet-basiert . Ein Tasklet unterstützt eine einfache Schnittstelle mit nur einer Methode, execute() , die wiederholt aufgerufen wird, bis sie entweder RepeatStatus.FINISHED oder eine Ausnahme auslöst, um einen Fehler zu signalisieren. Jeder Aufruf des Tasklet wird in eine Transaktion eingeschlossen.

Ein anderer Ansatz, die Chunk-orientierte Verarbeitung , bezieht sich auf das sequentielle Lesen der Daten und das Erstellen von „Chunks“, die innerhalb einer Transaktionsgrenze ausgeschrieben werden. Jedes einzelne Element wird von einem ItemReader , an einen ItemProcessor übergeben und aggregiert. Sobald die Anzahl der gelesenen Elemente dem Commit-Intervall entspricht, wird der gesamte Chunk über den ItemWriter geschrieben, und dann wird die Transaktion festgeschrieben. Ein Chunk-orientierter Schritt kann wie folgt konfiguriert werden:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

Die Methode chunk() erstellt einen Schritt, der Elemente in Blöcken mit der bereitgestellten Größe verarbeitet, wobei jeder Block dann an den angegebenen Reader, Prozessor und Writer übergeben wird. Diese Methoden werden in den nächsten Abschnitten dieses Artikels ausführlicher besprochen.

Benutzerdefinierter Leser

Für unsere Spring Batch-Beispielanwendung müssen wir zum Lesen einer Kundenliste aus einer XML-Datei eine Implementierung der Schnittstelle org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

Ein ItemReader stellt die Daten bereit und sollte zustandsbehaftet sein. Es wird normalerweise für jeden Stapel mehrmals aufgerufen, wobei jeder Aufruf von read() den nächsten Wert zurückgibt und schließlich null , wenn alle Eingabedaten erschöpft sind.

Spring Batch bietet einige sofort einsatzbereite Implementierungen von ItemReader , die für eine Vielzahl von Zwecken verwendet werden können, z. B. zum Lesen von Sammlungen, Dateien, zum Integrieren von JMS und JDBC sowie mehrerer Quellen und so weiter.

In unserer Beispielanwendung delegiert die CustomerItemReader -Klasse tatsächliche read() Aufrufe an eine verzögert initialisierte Instanz der IteratorItemReader -Klasse:

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Eine Spring-Bean für diese Implementierung wird mit den Annotationen @Component und @StepScope , die Spring wissen lässt, dass diese Klasse eine schrittbezogene Spring-Komponente ist und einmal pro Schrittausführung wie folgt erstellt wird:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Benutzerdefinierte Prozessoren

ItemProcessors wandeln Eingabeelemente um und führen Geschäftslogik in ein elementorientiertes Verarbeitungsszenario ein. Sie müssen eine Implementierung der Schnittstelle org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

Die Methode process() akzeptiert eine Instanz der I -Klasse und kann eine Instanz desselben Typs zurückgeben oder auch nicht. Die Rückgabe von null gibt an, dass das Element nicht weiter verarbeitet werden soll. Wie üblich bietet Spring einige Standardprozessoren, wie z. B. CompositeItemProcessor , der das Element durch eine Sequenz von ItemProcessor s und einen ValidatingItemProcessor leitet, der die Eingabe validiert.

Bei unserer Beispielanwendung werden Auftragsverarbeiter verwendet, um Kunden nach folgenden Anforderungen zu filtern:

  • Ein Kunde muss im aktuellen Monat geboren sein (z. B. um für Geburtstagsangebote usw. zu kennzeichnen).
  • Ein Kunde muss weniger als fünf abgeschlossene Transaktionen haben (z. B. um neuere Kunden zu identifizieren)

Die Anforderung „aktueller Monat“ wird über einen benutzerdefinierten ItemProcessor :

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

Die Anforderung „begrenzte Anzahl von Transaktionen“ wird als ValidatingItemProcessor implementiert:

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Dieses Prozessorpaar wird dann in einem CompositeItemProcessor gekapselt, der das Delegate-Muster implementiert:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Benutzerdefinierte Autoren

Für die Ausgabe der Daten stellt Spring Batch die Schnittstelle org.springframework.batch.item.ItemWriter zur Verfügung, um Objekte bei Bedarf zu serialisieren:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

Die Methode write() ist dafür verantwortlich, sicherzustellen, dass alle internen Puffer geleert werden. Wenn eine Transaktion aktiv ist, wird es normalerweise auch notwendig sein, die Ausgabe bei einem nachfolgenden Rollback zu verwerfen. Die Ressource, an die der Writer Daten sendet, sollte dies normalerweise selbst handhaben können. Es gibt Standardimplementierungen wie CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter und andere.

In unserer Beispielanwendung wird die Liste der gefilterten Kunden wie folgt geschrieben:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Planen von Spring-Batch-Jobs

Standardmäßig führt Spring Batch beim Start alle Jobs aus, die es finden kann (dh die wie in CustomerReportJobConfig konfiguriert sind). Um dieses Verhalten zu ändern, deaktivieren Sie die Auftragsausführung beim Start, indem Sie die folgende Eigenschaft zu application.properties hinzufügen:

 spring.batch.job.enabled=false

Die eigentliche Planung erfolgt dann durch Hinzufügen der Annotation @EnableScheduling zu einer Konfigurationsklasse und der Annotation @Scheduled zu der Methode, die den Job selbst ausführt. Die Planung kann mit Verzögerung, Raten oder Cron-Ausdrücken konfiguriert werden:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Es gibt jedoch ein Problem mit dem obigen Beispiel. Zur Laufzeit ist der Job nur beim ersten Mal erfolgreich. Beim zweiten Start (d. h. nach fünf Sekunden) werden die folgenden Meldungen in den Protokollen generiert (beachten Sie, dass in früheren Versionen von Spring Batch eine JobInstanceAlreadyCompleteException wäre):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Dies geschieht, weil nur eindeutige JobInstance s erstellt und ausgeführt werden können und Spring Batch keine Möglichkeit hat, zwischen der ersten und der zweiten JobInstance zu unterscheiden.

Es gibt zwei Möglichkeiten, dieses Problem zu vermeiden, wenn Sie einen Batch-Job planen.

Man muss sicherstellen, dass jeder Job einen oder mehrere eindeutige Parameter (z. B. tatsächliche Startzeit in Nanosekunden) einführt:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Alternativ können Sie den nächsten Job in einer Sequenz von JobInstance s starten, die durch den JobParametersIncrementer bestimmt werden, der mit SimpleJobOperator.startNextInstance() an den angegebenen Job angehängt ist:

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Spring Batch Unit Testing

Normalerweise muss das Framework zum Ausführen von Komponententests in einer Spring Boot-Anwendung einen entsprechenden ApplicationContext laden. Dazu werden zwei Annotationen verwendet:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Es gibt eine Utility-Klasse org.springframework.batch.test.JobLauncherTestUtils zum Testen von Batch-Jobs. Es bietet Methoden zum Starten eines gesamten Jobs sowie zum End-to-End-Testen einzelner Schritte, ohne dass jeder Schritt im Job ausgeführt werden muss. Sie muss als Frühlingsbohne deklariert werden:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Ein typischer Test für einen Job und einen Schritt sieht wie folgt aus (und kann auch beliebige mocking frameworks verwenden):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch führt zusätzliche Bereiche für Schritt- und Jobkontexte ein. Objekte in diesen Bereichen verwenden den Spring-Container als Objektfabrik, sodass es nur eine Instanz jeder dieser Beans pro Ausführungsschritt oder Job gibt. Darüber hinaus wird die späte Bindung von Referenzen unterstützt, auf die über StepContext oder JobContext . Die Komponenten, die zur Laufzeit schritt- oder auftragsbezogen konfiguriert werden, sind schwierig als eigenständige Komponenten zu testen, es sei denn, Sie haben eine Möglichkeit, den Kontext so festzulegen, als befänden sie sich in einer Schritt- oder Auftragsausführung. Das ist das Ziel der Komponenten org.springframework.batch.test.StepScopeTestExecutionListener und org.springframework.batch.test.StepScopeTestUtils in Spring Batch sowie JobScopeTestExecutionListener und JobScopeTestUtils .

Die TestExecutionListeners werden auf Klassenebene deklariert, und ihre Aufgabe besteht darin, einen Schrittausführungskontext für jede Testmethode zu erstellen. Zum Beispiel:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Es gibt zwei TestExecutionListener s. Einer stammt aus dem regulären Spring Test-Framework und verarbeitet Abhängigkeitsinjektionen aus dem konfigurierten Anwendungskontext. Der andere ist der Spring Batch StepScopeTestExecutionListener , der den Step-Scope-Kontext für die Abhängigkeitsinjektion in Komponententests einrichtet. Ein StepContext wird für die Dauer einer Testmethode erstellt und allen eingefügten Abhängigkeiten zur Verfügung gestellt. Das Standardverhalten besteht lediglich darin, eine StepExecution mit festen Eigenschaften zu erstellen. Alternativ kann der StepContext vom Testfall als Factory-Methode bereitgestellt werden, die den richtigen Typ zurückgibt.

Ein weiterer Ansatz basiert auf der Utility-Klasse StepScopeTestUtils . Diese Klasse wird verwendet, um StepScope in Komponententests flexibler zu erstellen und zu manipulieren, ohne Abhängigkeitsinjektion zu verwenden. Zum Beispiel könnte das Lesen der ID des Kunden, die durch den oben genannten Bearbeiter gefiltert wurde, wie folgt erfolgen:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Bereit für Advanced Spring Batch?

Dieser Artikel stellt einige der Grundlagen des Designs und der Entwicklung von Spring Batch-Anwendungen vor. Es gibt jedoch viele weiterführende Themen und Funktionen – wie z. B. Skalierung, Parallelverarbeitung, Listener und mehr – die in diesem Artikel nicht behandelt werden. Hoffentlich bietet dieser Artikel eine nützliche Grundlage für den Einstieg.

Informationen zu diesen weiterführenden Themen finden Sie dann in der offiziellen Spring Back-Dokumentation für Spring Batch.