Spring Batch Tutorial: Procesarea în lot simplificată cu Spring

Publicat: 2022-03-11

Procesarea în loturi – caracterizată prin execuția în fundal orientată în bloc, non-interactiv și frecvent de lungă durată – este utilizată pe scară largă în aproape orice industrie și este aplicată unei game variate de sarcini. Procesarea în loturi poate fi intensă din punct de vedere al datelor sau al calculului, se poate executa secvenţial sau în paralel şi poate fi iniţiată prin diferite modele de invocare, inclusiv ad-hoc, programate şi la cerere.

Acest tutorial Spring Batch explică modelul de programare și limbajul de domeniu al aplicațiilor batch în general și, în special, arată câteva abordări utile pentru proiectarea și dezvoltarea aplicațiilor batch folosind versiunea curentă Spring Batch 3.0.7 .

Ce este Spring Batch?

Spring Batch este un cadru ușor, cuprinzător, conceput pentru a facilita dezvoltarea aplicațiilor robuste în loturi. De asemenea, oferă servicii tehnice și caracteristici mai avansate care acceptă lucrări de lot de volum extrem de mare și de înaltă performanță prin tehnicile sale de optimizare și partiționare. Spring Batch se bazează pe abordarea de dezvoltare bazată pe POJO a Spring Framework, familiară tuturor dezvoltatorilor Spring cu experiență.

De exemplu, acest articol ia în considerare codul sursă dintr-un proiect exemplu care încarcă un fișier client în format XML, filtrează clienții după diverse atribute și trimite intrările filtrate într-un fișier text. Codul sursă pentru exemplul nostru Spring Batch (care folosește adnotări Lombok) este disponibil aici pe GitHub și necesită Java SE 8 și Maven.

Ce este procesarea în lot? Concepte cheie și terminologie

Este important ca orice dezvoltator de loturi să fie familiarizat și confortabil cu principalele concepte de procesare în loturi. Diagrama de mai jos este o versiune simplificată a arhitecturii de referință pe lot, care a fost dovedită prin decenii de implementări pe multe platforme diferite. Prezintă conceptele și termenii cheie relevanți pentru procesarea în lot, așa cum sunt utilizați de Spring Batch.

Tutorial Spring Batch: Concepte cheie și terminologie

După cum se arată în exemplul nostru de procesare în loturi, un proces în loturi este în mod obișnuit încapsulat de un Job format din mai multe Step . Fiecare Step are de obicei un singur ItemReader , ItemProcessor și ItemWriter . Un Job este executat de un JobLauncher , iar metadatele despre joburile configurate și executate sunt stocate într-un JobRepository .

Fiecare Job poate fi asociat cu mai multe JobInstance , fiecare dintre acestea fiind definită în mod unic de JobParameters particulari care sunt utilizați pentru a porni un job batch. Fiecare rulare a unui JobInstance este denumită JobExecution . Fiecare JobExecution urmărește de obicei ceea ce s-a întâmplat în timpul unei rulări, cum ar fi stările curente și de ieșire, orele de începere și de sfârșit etc.

Un Step este o fază independentă, specifică a unui lot Job , astfel încât fiecare Job este compus din unul sau mai multe Step . Similar cu un Job , un Step are un StepExecution individual care reprezintă o singură încercare de a executa un Step . StepExecution stochează informații despre stările curente și de ieșire, orele de începere și de sfârșit și așa mai departe, precum și referințe la instanțele Step și JobExecution corespunzătoare.

Un ExecutionContext este un set de perechi cheie-valoare care conțin informații care sunt vizate fie de StepExecution , fie de JobExecution . Spring Batch persistă ExecutionContext , care vă ajută în cazurile în care doriți să reporniți o execuție în lot (de exemplu, când a apărut o eroare fatală etc.). Tot ceea ce este necesar este să puneți în context orice obiect de partajat între pași, iar cadrul se va ocupa de restul. După repornire, valorile din ExecutionContext anterior sunt restaurate din baza de date și aplicate.

JobRepository este mecanismul din Spring Batch care face posibilă toată această persistență. Oferă operațiuni CRUD pentru JobLauncher , Job și Step . Odată ce un Job este lansat, un JobExecution este obținut din depozit și, în timpul execuției, StepExecution și JobExecution sunt persistente în depozit.

Noțiuni introductive cu Spring Batch Framework

Unul dintre avantajele Spring Batch este că dependențele de proiect sunt minime, ceea ce face mai ușor să porniți și să rulați rapid. Puținele dependențe care există sunt clar specificate și explicate în pom.xml al proiectului, care poate fi accesat aici.

Pornirea efectivă a aplicației are loc într-o clasă care arată cam așa:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

Adnotarea @EnableBatchProcessing activează caracteristicile Spring Batch și oferă o configurație de bază pentru configurarea joburilor batch.

Adnotarea @SpringBootApplication provine din proiectul Spring Boot care oferă aplicații autonome, pregătite pentru producție, bazate pe Spring. Specifică o clasă de configurare care declară unul sau mai multe bean-uri Spring și, de asemenea, declanșează configurarea automată și scanarea componentelor Spring.

Proiectul nostru eșantion are un singur job care este configurat de CustomerReportJobConfig cu un JobBuilderFactory și StepBuilderFactory injectate. Configurația minimă a jobului poate fi definită în CustomerReportJobConfig după cum urmează:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Există două abordări principale pentru construirea unui pas.

O abordare, așa cum se arată în exemplul de mai sus, este bazată pe tasklet . Un Tasklet acceptă o interfață simplă care are o singură metodă, execute() , care este apelată în mod repetat până când fie returnează RepeatStatus.FINISHED , fie aruncă o excepție pentru a semnala un eșec. Fiecare apel către Tasklet este înglobat într-o tranzacție.

O altă abordare, procesarea orientată pe bucăți , se referă la citirea secvențială a datelor și crearea de „bucăți” care vor fi scrise în cadrul unei granițe de tranzacție. Fiecare articol individual este citit dintr-un ItemReader , predat unui ItemProcessor și agregat. Odată ce numărul de articole citite este egal cu intervalul de comitere, întreaga bucată este scrisă prin ItemWriter și apoi tranzacția este comisă. Un pas orientat pe bucăți poate fi configurat după cum urmează:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

Metoda chunk() construiește un pas care procesează articole în bucăți cu dimensiunea furnizată, fiecare bucată fiind apoi transmisă cititorului, procesorului și scriitorului specificat. Aceste metode sunt discutate mai detaliat în secțiunile următoare ale acestui articol.

Cititor personalizat

Pentru aplicația noastră eșantion Spring Batch, pentru a citi o listă de clienți dintr-un fișier XML, trebuie să oferim o implementare a interfeței org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

Un ItemReader furnizează datele și se așteaptă să fie cu stare. În mod obișnuit, este apelat de mai multe ori pentru fiecare lot, fiecare apel la read() returnând următoarea valoare și în final returnând null atunci când toate datele de intrare au fost epuizate.

Spring Batch furnizează câteva implementări precoce ale ItemReader , care pot fi utilizate pentru o varietate de scopuri, cum ar fi citirea colecțiilor, fișierele, integrarea JMS și JDBC, precum și a mai multor surse și așa mai departe.

În aplicația noastră exemplu, clasa CustomerItemReader delegă apelurile reale read() unei instanțe inițializate leneș a clasei IteratorItemReader :

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Un bean Spring pentru această implementare este creat cu adnotările @Component și @StepScope , anunțându-l pe Spring că această clasă este o componentă Spring cu pas și va fi creată o dată pe execuție de pas, după cum urmează:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Procesoare personalizate

ItemProcessors transformă elementele de intrare și introduc logica de afaceri într-un scenariu de procesare orientat pe articole. Acestea trebuie să ofere o implementare a interfeței org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

Metoda process() acceptă o instanță din clasa I și poate returna sau nu o instanță de același tip. Returnarea null indică faptul că articolul nu ar trebui să fie procesat în continuare. Ca de obicei, Spring oferă puține procesoare standard, cum ar fi CompositeItemProcessor care trece elementul printr-o secvență de ItemProcessor injectate și un ValidatingItemProcessor care validează intrarea.

În cazul aplicației noastre eșantion, procesoarele sunt folosite pentru a filtra clienții după următoarele cerințe:

  • Un client trebuie să se nască în luna curentă (de exemplu, pentru a semnala oferte speciale pentru ziua de naștere etc.)
  • Un client trebuie să aibă mai puțin de cinci tranzacții finalizate (de exemplu, pentru a identifica clienții mai noi)

Cerința „lună curentă” este implementată printr-un ItemProcessor personalizat:

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

Cerința „număr limitat de tranzacții” este implementată ca ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Această pereche de procesoare este apoi încapsulată într-un CompositeItemProcessor care implementează modelul delegat:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Scriitori personalizați

Pentru ieșirea datelor, Spring Batch oferă interfața org.springframework.batch.item.ItemWriter pentru serializarea obiectelor după cum este necesar:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

Metoda write() este responsabilă pentru a se asigura că orice buffer-uri interne sunt spălate. Dacă o tranzacție este activă, va fi, de obicei, necesar să renunțați la ieșire la o derulare ulterioară. Resursa către care scriitorul trimite date ar trebui, în mod normal, să poată gestiona aceasta singură. Există implementări standard, cum ar fi CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter și altele.

În aplicația noastră exemplu, lista clienților filtrati este scrisă după cum urmează:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Programarea lucrărilor în lot de primăvară

În mod implicit, Spring Batch execută toate joburile pe care le poate găsi (adică, care sunt configurate ca în CustomerReportJobConfig ) la pornire. Pentru a schimba acest comportament, dezactivați execuția jobului la pornire adăugând următoarea proprietate la application.properties :

 spring.batch.job.enabled=false

Planificarea reală este apoi realizată prin adăugarea adnotării @EnableScheduling la o clasă de configurare și a adnotării @Scheduled la metoda care execută lucrarea în sine. Programarea poate fi configurată cu întârziere, rate sau expresii cron:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Există totuși o problemă cu exemplul de mai sus. În timpul execuției, lucrarea va reuși numai prima dată. Când se lansează a doua oară (adică după cinci secunde), va genera următoarele mesaje în jurnale (rețineți că în versiunile anterioare de Spring Batch ar fi fost lansată o JobInstanceAlreadyCompleteException ):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Acest lucru se întâmplă deoarece numai JobInstance unice pot fi create și executate și Spring Batch nu are nicio modalitate de a distinge între prima și a doua JobInstance .

Există două moduri de a evita această problemă atunci când programați o lucrare în lot.

Una este să vă asigurați că introduceți unul sau mai mulți parametri unici (de exemplu, timpul real de pornire în nanosecunde) pentru fiecare job:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Alternativ, puteți lansa următorul job într-o secvență de JobInstance determinată de JobParametersIncrementer atașat jobului specificat cu SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Testarea unității de lot de primăvară

De obicei, pentru a rula teste unitare într-o aplicație Spring Boot, cadrul trebuie să încarce un ApplicationContext corespunzător. În acest scop sunt folosite două adnotări:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Există o clasă de utilitate org.springframework.batch.test.JobLauncherTestUtils pentru a testa joburile batch. Oferă metode pentru lansarea unui întreg job și permite testarea de la capăt la capăt a pașilor individuali, fără a fi nevoie să rulați fiecare pas al jobului. Trebuie declarată fasole de primăvară:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Un test tipic pentru o slujbă și un pas arată după cum urmează (și poate folosi și orice cadru batjocoritor):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch introduce domenii suplimentare pentru contextele de pas și job. Obiectele din aceste domenii folosesc containerul Spring ca o fabrică de obiecte, deci există o singură instanță a fiecărui astfel de bean pentru fiecare pas de execuție sau job. În plus, se oferă suport pentru legarea tardivă a referințelor accesibile din StepContext sau JobContext . Componentele care sunt configurate în timpul execuției pentru a fi în funcție de pas sau de job sunt dificil de testat ca componente autonome, cu excepția cazului în care aveți o modalitate de a seta contextul ca și cum ar fi într-o execuție a unui pas sau a unui job. Acesta este scopul componentelor org.springframework.batch.test.StepScopeTestExecutionListener și org.springframework.batch.test.StepScopeTestUtils din Spring Batch, precum și JobScopeTestExecutionListener și JobScopeTestUtils .

TestExecutionListeners sunt declarate la nivel de clasă, iar sarcina sa este de a crea un context de execuție a pasului pentru fiecare metodă de testare. De exemplu:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Există două TestExecutionListener . Unul este din cadrul obișnuit Spring Test și se ocupă de injectarea dependenței din contextul aplicației configurate. Celălalt este Spring Batch StepScopeTestExecutionListener care setează contextul pasului pentru injectarea dependenței în testele unitare. Un StepContext este creat pe durata unei metode de testare și pus la dispoziție oricăror dependențe care sunt injectate. Comportamentul implicit este doar de a crea o StepExecution cu proprietăți fixe. În mod alternativ, StepContext poate fi furnizat de cazul de testare ca metodă din fabrică care returnează tipul corect.

O altă abordare se bazează pe clasa de utilitate StepScopeTestUtils . Această clasă este folosită pentru a crea și manipula StepScope în testele unitare într-un mod mai flexibil, fără a utiliza injecția de dependență. De exemplu, citirea ID-ului clientului filtrat de procesorul de mai sus se poate face după cum urmează:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Sunteți gata pentru lotul de primăvară avansat?

Acest articol prezintă câteva dintre elementele de bază ale proiectării și dezvoltării aplicațiilor Spring Batch. Cu toate acestea, există multe subiecte și capabilități mai avansate, cum ar fi scalarea, procesarea paralelă, ascultătorii și multe altele, care nu sunt abordate în acest articol. Sperăm că acest articol oferă o bază utilă pentru început.

Informații despre aceste subiecte mai avansate pot fi apoi găsite în documentația oficială Spring Back pentru Spring Batch.