Spring Batch Tutorial: Stapelverarbeitung leicht gemacht mit Spring
Veröffentlicht: 2022-03-11Die Batch-Verarbeitung – typisch für die massenorientierte, nicht interaktive und häufig lang andauernde Ausführung im Hintergrund – ist in praktisch allen Branchen weit verbreitet und wird auf eine Vielzahl von Aufgaben angewendet. Die Stapelverarbeitung kann daten- oder rechenintensiv sein, sequentiell oder parallel ausgeführt werden und kann durch verschiedene Aufrufmodelle initiiert werden, einschließlich Ad-hoc, geplant und auf Anfrage.
Dieses Spring Batch-Tutorial erläutert das Programmiermodell und die Domänensprache von Batch-Anwendungen im Allgemeinen und zeigt im Besonderen einige nützliche Ansätze für das Design und die Entwicklung von Batch-Anwendungen mit der aktuellen Version Spring Batch 3.0.7 .
Was ist Spring Batch?
Spring Batch ist ein leichtes, umfassendes Framework, das entwickelt wurde, um die Entwicklung robuster Batch-Anwendungen zu erleichtern. Es bietet auch fortschrittlichere technische Dienste und Funktionen, die Batch-Jobs mit extrem hohem Volumen und hoher Leistung durch seine Optimierungs- und Partitionierungstechniken unterstützen. Spring Batch baut auf dem POJO-basierten Entwicklungsansatz des Spring Framework auf, der allen erfahrenen Spring-Entwicklern vertraut ist.
Als Beispiel betrachtet dieser Artikel Quellcode aus einem Beispielprojekt, das eine Kundendatei im XML-Format lädt, Kunden nach verschiedenen Attributen filtert und die gefilterten Einträge in eine Textdatei ausgibt. Der Quellcode für unser Spring Batch-Beispiel (das Lombok-Anmerkungen verwendet) ist hier auf GitHub verfügbar und erfordert Java SE 8 und Maven.
Was ist Stapelverarbeitung? Schlüsselkonzepte und Terminologie
Es ist wichtig, dass jeder Batch-Entwickler mit den Hauptkonzepten der Batch-Verarbeitung vertraut und vertraut ist. Das folgende Diagramm ist eine vereinfachte Version der Batch-Referenzarchitektur, die sich in jahrzehntelangen Implementierungen auf vielen verschiedenen Plattformen bewährt hat. Es stellt die wichtigsten Konzepte und Begriffe vor, die für die Stapelverarbeitung relevant sind, wie sie von Spring Batch verwendet werden.
Wie in unserem Batch-Verarbeitungsbeispiel gezeigt, wird ein Batch-Prozess typischerweise durch einen Job
gekapselt, der aus mehreren Step
besteht. Jeder Step
hat normalerweise einen einzelnen ItemReader
, ItemProcessor
und ItemWriter
. Ein Job
wird von einem JobLauncher
ausgeführt, und Metadaten über konfigurierte und ausgeführte Jobs werden in einem JobRepository
gespeichert.
Jeder Job
kann mehreren JobInstance
werden, von denen jede eindeutig durch ihre speziellen JobParameters
definiert ist, die zum Starten eines Batch-Jobs verwendet werden. Jede Ausführung einer JobInstance
wird als JobExecution
bezeichnet. Jede JobExecution
verfolgt in der Regel, was während einer Ausführung passiert ist, wie z. B. aktuelle und Beendigungsstatus, Start- und Endzeiten usw.
Ein Step
ist eine unabhängige, spezifische Phase eines Batch- Job
, sodass jeder Job
aus einem oder mehreren Step
besteht. Ähnlich wie ein Job
hat ein Step
eine individuelle StepExecution
, die einen einzelnen Versuch darstellt, einen Step
auszuführen. StepExecution
speichert die Informationen über aktuelle und Beendigungsstatus, Start- und Endzeiten usw. sowie Verweise auf die entsprechenden Step
und JobExecution
Instanzen.
Ein ExecutionContext
ist ein Satz von Schlüssel-Wert-Paaren, die Informationen enthalten, die entweder StepExecution
oder JobExecution
. Spring Batch behält den ExecutionContext
bei, was in Fällen hilfreich ist, in denen Sie einen Batchlauf neu starten möchten (z. B. wenn ein schwerwiegender Fehler aufgetreten ist usw.). Alles, was benötigt wird, ist, jedes Objekt, das zwischen den Schritten geteilt werden soll, in den Kontext zu stellen, und das Framework kümmert sich um den Rest. Nach dem Neustart werden die Werte aus dem vorherigen ExecutionContext
aus der Datenbank wiederhergestellt und angewendet.
JobRepository
ist der Mechanismus in Spring Batch, der all diese Persistenz ermöglicht. Es stellt CRUD-Operationen für JobLauncher
, Job
und Step
-Instanziierungen bereit. Sobald ein Job
gestartet wird, wird eine JobExecution
aus dem Repository abgerufen, und während der Ausführung werden StepExecution
und JobExecution
Instanzen im Repository gespeichert.
Erste Schritte mit Spring Batch Framework
Einer der Vorteile von Spring Batch ist, dass die Projektabhängigkeiten minimal sind, was es einfacher macht, schnell loszulegen. Die wenigen existierenden Abhängigkeiten sind in der pom.xml
des Projekts, auf die hier zugegriffen werden kann, klar spezifiziert und erklärt.
Der eigentliche Start der Anwendung erfolgt in einer Klasse, die etwa wie folgt aussieht:
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }
Die Annotation @EnableBatchProcessing
Spring Batch-Funktionen und bietet eine Basiskonfiguration zum Einrichten von Batch-Jobs.
Die Annotation @SpringBootApplication
stammt aus dem Spring Boot-Projekt, das eigenständige, produktionsbereite, auf Spring basierende Anwendungen bereitstellt. Es gibt eine Konfigurationsklasse an, die eine oder mehrere Spring-Beans deklariert und auch die automatische Konfiguration und das Scannen von Spring-Komponenten auslöst.
Unser Beispielprojekt hat nur einen Job, der von CustomerReportJobConfig
mit einer JobBuilderFactory
und StepBuilderFactory
konfiguriert wird. Die minimale Jobkonfiguration kann in CustomerReportJobConfig
wie folgt definiert werden:
@Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }
Es gibt zwei Hauptansätze zum Erstellen einer Stufe.
Ein Ansatz, wie im obigen Beispiel gezeigt, ist Tasklet-basiert . Ein Tasklet
unterstützt eine einfache Schnittstelle mit nur einer Methode, execute()
, die wiederholt aufgerufen wird, bis sie entweder RepeatStatus.FINISHED
oder eine Ausnahme auslöst, um einen Fehler zu signalisieren. Jeder Aufruf des Tasklet
wird in eine Transaktion eingeschlossen.
Ein anderer Ansatz, die Chunk-orientierte Verarbeitung , bezieht sich auf das sequentielle Lesen der Daten und das Erstellen von „Chunks“, die innerhalb einer Transaktionsgrenze ausgeschrieben werden. Jedes einzelne Element wird von einem ItemReader
, an einen ItemProcessor
übergeben und aggregiert. Sobald die Anzahl der gelesenen Elemente dem Commit-Intervall entspricht, wird der gesamte Chunk über den ItemWriter
geschrieben, und dann wird die Transaktion festgeschrieben. Ein Chunk-orientierter Schritt kann wie folgt konfiguriert werden:
@Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
Die Methode chunk()
erstellt einen Schritt, der Elemente in Blöcken mit der bereitgestellten Größe verarbeitet, wobei jeder Block dann an den angegebenen Reader, Prozessor und Writer übergeben wird. Diese Methoden werden in den nächsten Abschnitten dieses Artikels ausführlicher besprochen.
Benutzerdefinierter Leser
Für unsere Spring Batch-Beispielanwendung müssen wir zum Lesen einer Kundenliste aus einer XML-Datei eine Implementierung der Schnittstelle org.springframework.batch.item.ItemReader
:
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
Ein ItemReader
stellt die Daten bereit und sollte zustandsbehaftet sein. Es wird normalerweise für jeden Stapel mehrmals aufgerufen, wobei jeder Aufruf von read()
den nächsten Wert zurückgibt und schließlich null
, wenn alle Eingabedaten erschöpft sind.
Spring Batch bietet einige sofort einsatzbereite Implementierungen von ItemReader
, die für eine Vielzahl von Zwecken verwendet werden können, z. B. zum Lesen von Sammlungen, Dateien, zum Integrieren von JMS und JDBC sowie mehrerer Quellen und so weiter.
In unserer Beispielanwendung delegiert die CustomerItemReader
-Klasse tatsächliche read()
Aufrufe an eine verzögert initialisierte Instanz der IteratorItemReader
-Klasse:
public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }
Eine Spring-Bean für diese Implementierung wird mit den Annotationen @Component
und @StepScope
, die Spring wissen lässt, dass diese Klasse eine schrittbezogene Spring-Komponente ist und einmal pro Schrittausführung wie folgt erstellt wird:
@StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }
Benutzerdefinierte Prozessoren
ItemProcessors
wandeln Eingabeelemente um und führen Geschäftslogik in ein elementorientiertes Verarbeitungsszenario ein. Sie müssen eine Implementierung der Schnittstelle org.springframework.batch.item.ItemProcessor
:
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
Die Methode process()
akzeptiert eine Instanz der I
-Klasse und kann eine Instanz desselben Typs zurückgeben oder auch nicht. Die Rückgabe von null
gibt an, dass das Element nicht weiter verarbeitet werden soll. Wie üblich bietet Spring einige Standardprozessoren, wie z. B. CompositeItemProcessor
, der das Element durch eine Sequenz von ItemProcessor
s und einen ValidatingItemProcessor
leitet, der die Eingabe validiert.
Bei unserer Beispielanwendung werden Auftragsverarbeiter verwendet, um Kunden nach folgenden Anforderungen zu filtern:
- Ein Kunde muss im aktuellen Monat geboren sein (z. B. um für Geburtstagsangebote usw. zu kennzeichnen).
- Ein Kunde muss weniger als fünf abgeschlossene Transaktionen haben (z. B. um neuere Kunden zu identifizieren)
Die Anforderung „aktueller Monat“ wird über einen benutzerdefinierten ItemProcessor
:

public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }
Die Anforderung „begrenzte Anzahl von Transaktionen“ wird als ValidatingItemProcessor
implementiert:
public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }
Dieses Prozessorpaar wird dann in einem CompositeItemProcessor
gekapselt, der das Delegate-Muster implementiert:
@StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }
Benutzerdefinierte Autoren
Für die Ausgabe der Daten stellt Spring Batch die Schnittstelle org.springframework.batch.item.ItemWriter
zur Verfügung, um Objekte bei Bedarf zu serialisieren:
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }
Die Methode write()
ist dafür verantwortlich, sicherzustellen, dass alle internen Puffer geleert werden. Wenn eine Transaktion aktiv ist, wird es normalerweise auch notwendig sein, die Ausgabe bei einem nachfolgenden Rollback zu verwerfen. Die Ressource, an die der Writer Daten sendet, sollte dies normalerweise selbst handhaben können. Es gibt Standardimplementierungen wie CompositeItemWriter
, JdbcBatchItemWriter
, JmsItemWriter
, JpaItemWriter
, SimpleMailMessageItemWriter
und andere.
In unserer Beispielanwendung wird die Liste der gefilterten Kunden wie folgt geschrieben:
public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }
Planen von Spring-Batch-Jobs
Standardmäßig führt Spring Batch beim Start alle Jobs aus, die es finden kann (dh die wie in CustomerReportJobConfig
konfiguriert sind). Um dieses Verhalten zu ändern, deaktivieren Sie die Auftragsausführung beim Start, indem Sie die folgende Eigenschaft zu application.properties
hinzufügen:
spring.batch.job.enabled=false
Die eigentliche Planung erfolgt dann durch Hinzufügen der Annotation @EnableScheduling
zu einer Konfigurationsklasse und der Annotation @Scheduled
zu der Methode, die den Job selbst ausführt. Die Planung kann mit Verzögerung, Raten oder Cron-Ausdrücken konfiguriert werden:
// run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }
Es gibt jedoch ein Problem mit dem obigen Beispiel. Zur Laufzeit ist der Job nur beim ersten Mal erfolgreich. Beim zweiten Start (d. h. nach fünf Sekunden) werden die folgenden Meldungen in den Protokollen generiert (beachten Sie, dass in früheren Versionen von Spring Batch eine JobInstanceAlreadyCompleteException
wäre):
INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=
Dies geschieht, weil nur eindeutige JobInstance
s erstellt und ausgeführt werden können und Spring Batch keine Möglichkeit hat, zwischen der ersten und der zweiten JobInstance
zu unterscheiden.
Es gibt zwei Möglichkeiten, dieses Problem zu vermeiden, wenn Sie einen Batch-Job planen.
Man muss sicherstellen, dass jeder Job einen oder mehrere eindeutige Parameter (z. B. tatsächliche Startzeit in Nanosekunden) einführt:
@Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }
Alternativ können Sie den nächsten Job in einer Sequenz von JobInstance
s starten, die durch den JobParametersIncrementer
bestimmt werden, der mit SimpleJobOperator.startNextInstance()
an den angegebenen Job angehängt ist:
@Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }
Spring Batch Unit Testing
Normalerweise muss das Framework zum Ausführen von Komponententests in einer Spring Boot-Anwendung einen entsprechenden ApplicationContext
laden. Dazu werden zwei Annotationen verwendet:
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})
Es gibt eine Utility-Klasse org.springframework.batch.test.JobLauncherTestUtils
zum Testen von Batch-Jobs. Es bietet Methoden zum Starten eines gesamten Jobs sowie zum End-to-End-Testen einzelner Schritte, ohne dass jeder Schritt im Job ausgeführt werden muss. Sie muss als Frühlingsbohne deklariert werden:
@Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }
Ein typischer Test für einen Job und einen Schritt sieht wie folgt aus (und kann auch beliebige mocking frameworks verwenden):
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }
Spring Batch führt zusätzliche Bereiche für Schritt- und Jobkontexte ein. Objekte in diesen Bereichen verwenden den Spring-Container als Objektfabrik, sodass es nur eine Instanz jeder dieser Beans pro Ausführungsschritt oder Job gibt. Darüber hinaus wird die späte Bindung von Referenzen unterstützt, auf die über StepContext
oder JobContext
. Die Komponenten, die zur Laufzeit schritt- oder auftragsbezogen konfiguriert werden, sind schwierig als eigenständige Komponenten zu testen, es sei denn, Sie haben eine Möglichkeit, den Kontext so festzulegen, als befänden sie sich in einer Schritt- oder Auftragsausführung. Das ist das Ziel der Komponenten org.springframework.batch.test.StepScopeTestExecutionListener
und org.springframework.batch.test.StepScopeTestUtils
in Spring Batch sowie JobScopeTestExecutionListener
und JobScopeTestUtils
.
Die TestExecutionListeners
werden auf Klassenebene deklariert, und ihre Aufgabe besteht darin, einen Schrittausführungskontext für jede Testmethode zu erstellen. Zum Beispiel:
@RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }
Es gibt zwei TestExecutionListener
s. Einer stammt aus dem regulären Spring Test-Framework und verarbeitet Abhängigkeitsinjektionen aus dem konfigurierten Anwendungskontext. Der andere ist der Spring Batch StepScopeTestExecutionListener
, der den Step-Scope-Kontext für die Abhängigkeitsinjektion in Komponententests einrichtet. Ein StepContext
wird für die Dauer einer Testmethode erstellt und allen eingefügten Abhängigkeiten zur Verfügung gestellt. Das Standardverhalten besteht lediglich darin, eine StepExecution
mit festen Eigenschaften zu erstellen. Alternativ kann der StepContext
vom Testfall als Factory-Methode bereitgestellt werden, die den richtigen Typ zurückgibt.
Ein weiterer Ansatz basiert auf der Utility-Klasse StepScopeTestUtils
. Diese Klasse wird verwendet, um StepScope
in Komponententests flexibler zu erstellen und zu manipulieren, ohne Abhängigkeitsinjektion zu verwenden. Zum Beispiel könnte das Lesen der ID des Kunden, die durch den oben genannten Bearbeiter gefiltert wurde, wie folgt erfolgen:
@Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }
Bereit für Advanced Spring Batch?
Dieser Artikel stellt einige der Grundlagen des Designs und der Entwicklung von Spring Batch-Anwendungen vor. Es gibt jedoch viele weiterführende Themen und Funktionen – wie z. B. Skalierung, Parallelverarbeitung, Listener und mehr – die in diesem Artikel nicht behandelt werden. Hoffentlich bietet dieser Artikel eine nützliche Grundlage für den Einstieg.
Informationen zu diesen weiterführenden Themen finden Sie dann in der offiziellen Spring Back-Dokumentation für Spring Batch.