Tutorial Spring Batch: l'elaborazione batch semplificata con Spring

Pubblicato: 2022-03-11

L'elaborazione batch, caratterizzata da un'esecuzione in background orientata alla massa, non interattiva e spesso di lunga durata, è ampiamente utilizzata praticamente in ogni settore e viene applicata a una vasta gamma di attività. L'elaborazione batch può richiedere dati o computazione intensiva, essere eseguita in sequenza o in parallelo e può essere avviata tramite vari modelli di chiamata, inclusi ad hoc, programmati e su richiesta.

Questo tutorial di Spring Batch spiega il modello di programmazione e il linguaggio di dominio delle applicazioni batch in generale e, in particolare, mostra alcuni utili approcci alla progettazione e allo sviluppo di applicazioni batch utilizzando l'attuale versione Spring Batch 3.0.7 .

Cos'è il lotto di primavera?

Spring Batch è un framework leggero e completo progettato per facilitare lo sviluppo di robuste applicazioni batch. Fornisce inoltre servizi tecnici e funzionalità più avanzati che supportano lavori batch a volume estremamente elevato e prestazioni elevate attraverso le sue tecniche di ottimizzazione e partizionamento. Spring Batch si basa sull'approccio di sviluppo basato su POJO di Spring Framework, familiare a tutti gli sviluppatori Spring esperti.

A titolo di esempio, in questo articolo viene preso in considerazione il codice sorgente di un progetto di esempio che carica un file del cliente in formato XML, filtra i clienti in base a vari attributi e restituisce le voci filtrate in un file di testo. Il codice sorgente per il nostro esempio Spring Batch (che utilizza le annotazioni Lombok) è disponibile qui su GitHub e richiede Java SE 8 e Maven.

Che cos'è l'elaborazione batch? Concetti chiave e terminologia

È importante che qualsiasi sviluppatore batch abbia familiarità e si senta a proprio agio con i concetti principali dell'elaborazione batch. Il diagramma seguente è una versione semplificata dell'architettura di riferimento batch che è stata dimostrata in decenni di implementazioni su molte piattaforme diverse. Introduce i concetti chiave ei termini relativi all'elaborazione batch, come utilizzati da Spring Batch.

Tutorial Spring Batch: concetti chiave e terminologia

Come mostrato nel nostro esempio di elaborazione batch, un processo batch è in genere incapsulato da un Job costituito da più Step . Ogni Step ha in genere un singolo ItemReader , ItemProcessor e ItemWriter . Un Job viene eseguito da un JobLauncher ei metadati sui lavori configurati ed eseguiti vengono archiviati in un JobRepository .

Ciascun Job può essere associato a più JobInstance , ognuno dei quali è definito in modo univoco dai relativi JobParameters utilizzati per avviare un lavoro batch. Ogni esecuzione di un JobInstance viene definita JobExecution . Ogni JobExecution in genere tiene traccia di ciò che è accaduto durante un'esecuzione, come lo stato corrente e di uscita, l'ora di inizio e di fine, ecc.

Un Step è una fase specifica e indipendente di un Job batch, in modo tale che ogni Job sia composto da uno o più Step . Simile a un Job , un Step ha una singola StepExecution che rappresenta un singolo tentativo di eseguire un Step . StepExecution archivia le informazioni sullo stato corrente e di uscita, l'ora di inizio e di fine e così via, nonché i riferimenti alle istanze Step e JobExecution corrispondenti.

Un ExecutionContext è un insieme di coppie chiave-valore contenenti informazioni con ambito StepExecution o JobExecution . Spring Batch mantiene ExecutionContext , che aiuta nei casi in cui si desidera riavviare un'esecuzione batch (ad esempio, quando si è verificato un errore irreversibile, ecc.). Tutto ciò che serve è inserire qualsiasi oggetto da condividere tra i passaggi nel contesto e la struttura si occuperà del resto. Dopo il riavvio, i valori del precedente ExecutionContext vengono ripristinati dal database e applicati.

JobRepository è il meccanismo in Spring Batch che rende possibile tutta questa persistenza. Fornisce operazioni CRUD per le JobLauncher , Job e Step . Una volta avviato un Job , si ottiene un JobExecution dal repository e, durante il corso dell'esecuzione, le istanze StepExecution e JobExecution vengono mantenute nel repository.

Introduzione a Spring Batch Framework

Uno dei vantaggi di Spring Batch è che le dipendenze del progetto sono minime, il che rende più facile iniziare a funzionare rapidamente. Le poche dipendenze che esistono sono chiaramente specificate e spiegate nel pom.xml del progetto, a cui è possibile accedere qui.

L'avvio effettivo dell'applicazione avviene in una classe simile alla seguente:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

L'annotazione @EnableBatchProcessing abilita le funzionalità Spring Batch e fornisce una configurazione di base per l'impostazione di lavori batch.

L'annotazione @SpringBootApplication deriva dal progetto Spring Boot che fornisce applicazioni standalone, pronte per la produzione, basate su Spring. Specifica una classe di configurazione che dichiara uno o più bean Spring e attiva anche la configurazione automatica e la scansione dei componenti di Spring.

Il nostro progetto di esempio ha un solo lavoro configurato da CustomerReportJobConfig con JobBuilderFactory e StepBuilderFactory iniettati. La configurazione minima del lavoro può essere definita in CustomerReportJobConfig come segue:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Ci sono due approcci principali per costruire un gradino.

Un approccio, come mostrato nell'esempio precedente, è basato su tasklet . Un Tasklet supporta un'interfaccia semplice che ha un solo metodo, execute() , che viene chiamato ripetutamente finché non restituisce RepeatStatus.FINISHED o genera un'eccezione per segnalare un errore. Ogni chiamata al Tasklet è racchiusa in una transazione.

Un altro approccio, l'elaborazione orientata al blocco , si riferisce alla lettura dei dati in sequenza e alla creazione di "blocchi" che verranno scritti all'interno di un limite di transazione. Ogni singolo elemento viene letto da un ItemReader , consegnato a un ItemProcessor e aggregato. Una volta che il numero di elementi letti è uguale all'intervallo di commit, l'intero blocco viene scritto tramite ItemWriter e quindi viene eseguito il commit della transazione. Un passaggio orientato al blocco può essere configurato come segue:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

Il metodo chunk() crea un passaggio che elabora gli elementi in blocchi con la dimensione fornita, con ogni blocco che viene quindi passato al lettore, processore e scrittore specificato. Questi metodi sono discussi in modo più dettagliato nelle sezioni successive di questo articolo.

Lettore personalizzato

Per la nostra applicazione di esempio Spring Batch, per leggere un elenco di clienti da un file XML, dobbiamo fornire un'implementazione dell'interfaccia org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

Un ItemReader fornisce i dati e dovrebbe essere con stato. In genere viene chiamato più volte per ogni batch, con ogni chiamata a read() che restituisce il valore successivo e infine restituisce null quando tutti i dati di input sono stati esauriti.

Spring Batch fornisce alcune implementazioni pronte all'uso di ItemReader , che possono essere utilizzate per una varietà di scopi come la lettura di raccolte, file, integrazione di JMS e JDBC, nonché più origini e così via.

Nella nostra applicazione di esempio, la classe CustomerItemReader delega le chiamate read() effettive a un'istanza inizializzata in modo pigro della classe IteratorItemReader :

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Un bean Spring per questa implementazione viene creato con le annotazioni @Component e @StepScope , facendo sapere a Spring che questa classe è un componente Spring con ambito step e verrà creata una volta per esecuzione del passaggio come segue:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Processori personalizzati

ItemProcessors trasforma gli elementi di input e introducono la logica aziendale in uno scenario di elaborazione orientato agli elementi. Devono fornire un'implementazione dell'interfaccia org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

Il metodo process() accetta un'istanza della classe I e può restituire o meno un'istanza dello stesso tipo. La restituzione di null indica che l'articolo non deve continuare a essere elaborato. Come al solito, Spring fornisce alcuni processori standard, come CompositeItemProcessor che passa l'elemento attraverso una sequenza di ItemProcessor iniettati e un ValidatingItemProcessor che convalida l'input.

Nel caso della nostra applicazione di esempio, i processori vengono utilizzati per filtrare i clienti in base ai seguenti requisiti:

  • Un cliente deve essere nato nel mese corrente (ad es. per segnalare offerte speciali di compleanno, ecc.)
  • Un cliente deve avere meno di cinque transazioni completate (ad esempio, per identificare nuovi clienti)

Il requisito del "mese corrente" viene implementato tramite un ItemProcessor personalizzato:

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

Il requisito del "numero limitato di transazioni" è implementato come ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Questa coppia di processori viene quindi incapsulata all'interno di un CompositeItemProcessor che implementa il modello delegato:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Scrittori personalizzati

Per l'output dei dati, Spring Batch fornisce l'interfaccia org.springframework.batch.item.ItemWriter per serializzare gli oggetti secondo necessità:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

Il metodo write() è responsabile di assicurarsi che tutti i buffer interni vengano svuotati. Se una transazione è attiva, in genere sarà anche necessario eliminare l'output in un successivo rollback. La risorsa a cui lo scrittore sta inviando i dati dovrebbe normalmente essere in grado di gestirlo da sola. Esistono implementazioni standard come CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , SimpleMailMessageItemWriter , JpaItemWriter e altri.

Nella nostra applicazione di esempio, l'elenco dei clienti filtrati è scritto come segue:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Programmazione dei lavori in lotti primaverili

Per impostazione predefinita, Spring Batch esegue tutti i lavori che riesce a trovare (ovvero, che sono configurati come in CustomerReportJobConfig ) all'avvio. Per modificare questo comportamento, disabilitare l'esecuzione del lavoro all'avvio aggiungendo la seguente proprietà a application.properties :

 spring.batch.job.enabled=false

La pianificazione effettiva viene quindi ottenuta aggiungendo l'annotazione @EnableScheduling a una classe di configurazione e l'annotazione @Scheduled al metodo che esegue il lavoro stesso. La pianificazione può essere configurata con espressioni delay, rate o cron:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

C'è però un problema con l'esempio sopra. In fase di esecuzione, il processo avrà esito positivo solo la prima volta. Quando si avvia la seconda volta (cioè dopo cinque secondi), genererà i seguenti messaggi nei log (notare che nelle versioni precedenti di Spring Batch sarebbe stata generata JobInstanceAlreadyCompleteException ):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Ciò accade perché è possibile creare ed eseguire solo JobInstance s univoci e Spring Batch non ha modo di distinguere tra la prima e la seconda JobInstance .

Esistono due modi per evitare questo problema quando si pianifica un processo batch.

Uno è assicurarsi di introdurre uno o più parametri univoci (ad es. tempo di inizio effettivo in nanosecondi) a ciascun lavoro:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

In alternativa, puoi avviare il lavoro successivo in una sequenza di JobInstance determinata da JobParametersIncrementer allegato al lavoro specificato con SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Test di unità in lotti primaverili

Di solito, per eseguire unit test in un'applicazione Spring Boot, il framework deve caricare un ApplicationContext corrispondente. A questo scopo vengono utilizzate due annotazioni:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Esiste una classe di utilità org.springframework.batch.test.JobLauncherTestUtils per testare i lavori batch. Fornisce metodi per avviare un intero lavoro e consente il test end-to-end di singoli passaggi senza dover eseguire ogni passaggio del lavoro. Deve essere dichiarato come chicco di primavera:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Un test tipico per un lavoro e un passaggio ha il seguente aspetto (e può utilizzare anche qualsiasi framework derisorio):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch introduce ambiti aggiuntivi per i contesti dei passaggi e dei lavori. Gli oggetti in questi ambiti usano il contenitore Spring come una fabbrica di oggetti, quindi c'è solo un'istanza di ciascuno di questi bean per fase di esecuzione o lavoro. Inoltre, viene fornito supporto per l'associazione tardiva di riferimenti accessibili da StepContext o JobContext . I componenti configurati in fase di esecuzione per essere nell'ambito di un passaggio o di un lavoro sono difficili da testare come componenti autonomi a meno che non si disponga di un modo per impostare il contesto come se si trovassero in un passaggio o nell'esecuzione di un processo. Questo è l'obiettivo dei componenti org.springframework.batch.test.StepScopeTestExecutionListener e org.springframework.batch.test.StepScopeTestUtils in Spring Batch, nonché JobScopeTestExecutionListener e JobScopeTestUtils .

I TestExecutionListeners sono dichiarati a livello di classe e il loro compito è creare un contesto di esecuzione del passaggio per ogni metodo di test. Per esempio:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Esistono due TestExecutionListener s. Uno proviene dal normale framework Spring Test e gestisce l'inserimento delle dipendenze dal contesto dell'applicazione configurato. L'altro è lo Spring Batch StepScopeTestExecutionListener che imposta il contesto dell'ambito del passaggio per l'inserimento delle dipendenze negli unit test. Viene creato uno StepContext per la durata di un metodo di test e reso disponibile a tutte le dipendenze che vengono iniettate. Il comportamento predefinito consiste solo nel creare una StepExecution con proprietà fisse. In alternativa, StepContext può essere fornito dal test case come metodo di fabbrica che restituisce il tipo corretto.

Un altro approccio è basato sulla classe di utilità StepScopeTestUtils . Questa classe viene utilizzata per creare e manipolare StepScope negli unit test in modo più flessibile senza utilizzare l'iniezione delle dipendenze. Ad esempio, la lettura dell'ID del cliente filtrato dal processore di cui sopra potrebbe essere eseguita come segue:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Pronto per Advanced Spring Batch?

Questo articolo introduce alcune delle nozioni di base per la progettazione e lo sviluppo di applicazioni Spring Batch. Tuttavia, ci sono molti argomenti e funzionalità più avanzati, come il ridimensionamento, l'elaborazione parallela, i listener e altro, che non sono trattati in questo articolo. Si spera che questo articolo fornisca una base utile per iniziare.

Le informazioni su questi argomenti più avanzati possono quindi essere trovate nella documentazione ufficiale di Spring Back per Spring Batch.