Spring Batch Tutorial: Processamento em lote facilitado com Spring

Publicados: 2022-03-11

O processamento em lote – tipificado pela execução em segundo plano orientada em massa, não interativa e frequentemente de longa duração – é amplamente usado em praticamente todos os setores e é aplicado a uma variedade de tarefas. O processamento em lote pode ser intensivo de dados ou computacionalmente, executado sequencialmente ou em paralelo e pode ser iniciado por meio de vários modelos de invocação, incluindo ad hoc, agendado e sob demanda.

Este tutorial do Spring Batch explica o modelo de programação e a linguagem de domínio de aplicativos em lote em geral e, em particular, mostra algumas abordagens úteis para o design e desenvolvimento de aplicativos em lote usando a versão atual do Spring Batch 3.0.7 .

O que é o Lote de Primavera?

O Spring Batch é uma estrutura leve e abrangente projetada para facilitar o desenvolvimento de aplicativos em lote robustos. Ele também fornece serviços e recursos técnicos mais avançados que suportam trabalhos em lote de alto volume e alto desempenho por meio de suas técnicas de otimização e particionamento. O Spring Batch baseia-se na abordagem de desenvolvimento baseada em POJO do Spring Framework, familiar a todos os desenvolvedores Spring experientes.

A título de exemplo, este artigo considera o código-fonte de um projeto de exemplo que carrega um arquivo de cliente formatado em XML, filtra clientes por vários atributos e gera as entradas filtradas em um arquivo de texto. O código-fonte para nosso exemplo Spring Batch (que faz uso de anotações do Lombok) está disponível aqui no GitHub e requer Java SE 8 e Maven.

O que é processamento em lote? Conceitos-chave e Terminologia

É importante para qualquer desenvolvedor de lote estar familiarizado e confortável com os principais conceitos de processamento em lote. O diagrama abaixo é uma versão simplificada da arquitetura de referência em lote que foi comprovada por décadas de implementações em muitas plataformas diferentes. Ele apresenta os principais conceitos e termos relevantes para o processamento em lote, conforme usado pelo Spring Batch.

Tutorial Spring Batch: Conceitos-chave e terminologia

Conforme mostrado em nosso exemplo de processamento em lote, um processo em lote normalmente é encapsulado por um Job que consiste em vários Step s. Cada Step normalmente tem um único ItemReader , ItemProcessor e ItemWriter . Um Job é executado por um JobLauncher e os metadados sobre jobs configurados e executados são armazenados em um JobRepository .

Cada Job pode ser associado a vários JobInstance s, cada um dos quais é definido exclusivamente por seus JobParameters específicos que são usados ​​para iniciar um job em lote. Cada execução de uma JobInstance é chamada de JobExecution . Cada JobExecution normalmente rastreia o que aconteceu durante uma execução, como status atual e de saída, horários de início e término etc.

Um Step é uma fase independente e específica de um batch Job , de modo que cada Job é composto por um ou mais Step s. Semelhante a um Job , um Step tem um StepExecution individual que representa uma única tentativa de executar um Step . StepExecution armazena as informações sobre os status atuais e de saída, horários de início e término e assim por diante, bem como referências às instâncias Step e JobExecution correspondentes.

Um ExecutionContext é um conjunto de pares de valores-chave contendo informações com escopo para StepExecution ou JobExecution . O Spring Batch persiste o ExecutionContext , que ajuda nos casos em que você deseja reiniciar uma execução em lote (por exemplo, quando ocorreu um erro fatal etc.). Tudo o que é necessário é colocar qualquer objeto a ser compartilhado entre as etapas no contexto e a estrutura cuidará do resto. Após a reinicialização, os valores do ExecutionContext anterior são restaurados do banco de dados e aplicados.

JobRepository é o mecanismo no Spring Batch que possibilita toda essa persistência. Ele fornece operações CRUD para JobLauncher , Job e Step . Depois que um Job é iniciado, um JobExecution é obtido do repositório e, durante o curso da execução, as instâncias StepExecution e JobExecution são persistidas no repositório.

Introdução ao Spring Batch Framework

Uma das vantagens do Spring Batch é que as dependências do projeto são mínimas, o que facilita a execução rápida. As poucas dependências que existem estão claramente especificadas e explicadas no pom.xml do projeto, que pode ser acessado aqui.

A inicialização real do aplicativo acontece em uma classe parecida com a seguinte:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

A anotação @EnableBatchProcessing habilita os recursos do Spring Batch e fornece uma configuração básica para configurar trabalhos em lote.

A anotação @SpringBootApplication vem do projeto Spring Boot que fornece aplicativos independentes, prontos para produção e baseados em Spring. Ele especifica uma classe de configuração que declara um ou mais beans Spring e também aciona a configuração automática e a varredura de componentes do Spring.

Nosso projeto de exemplo tem apenas um trabalho configurado por CustomerReportJobConfig com um JobBuilderFactory e StepBuilderFactory injetados. A configuração mínima do trabalho pode ser definida em CustomerReportJobConfig da seguinte forma:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Existem duas abordagens principais para construir uma etapa.

Uma abordagem, conforme mostrado no exemplo acima, é baseada em tasklet . Um Tasklet suporta uma interface simples que tem apenas um método, execute() , que é chamado repetidamente até retornar RepeatStatus.FINISHED ou lançar uma exceção para sinalizar uma falha. Cada chamada para o Tasklet é encapsulada em uma transação.

Outra abordagem, o processamento orientado a blocos, refere-se à leitura sequencial dos dados e à criação de “pedaços” que serão gravados dentro de um limite de transação. Cada item individual é lido de um ItemReader , entregue a um ItemProcessor e agregado. Uma vez que o número de itens lidos é igual ao intervalo de confirmação, todo o bloco é gravado por meio do ItemWriter e, em seguida, a transação é confirmada. Uma etapa orientada a blocos pode ser configurada da seguinte maneira:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

O método chunk() cria uma etapa que processa itens em partes com o tamanho fornecido, com cada parte sendo passada para o leitor, processador e gravador especificados. Esses métodos são discutidos com mais detalhes nas próximas seções deste artigo.

Leitor personalizado

Para nosso aplicativo de amostra Spring Batch, para ler uma lista de clientes de um arquivo XML, precisamos fornecer uma implementação da interface org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

Um ItemReader fornece os dados e espera-se que seja stateful. Normalmente é chamado várias vezes para cada lote, com cada chamada para read() retornando o próximo valor e finalmente retornando null quando todos os dados de entrada forem esgotados.

O Spring Batch fornece algumas implementações prontas para uso do ItemReader , que podem ser usadas para vários propósitos, como leitura de coleções, arquivos, integração de JMS e JDBC, bem como várias fontes e assim por diante.

Em nosso aplicativo de exemplo, a classe CustomerItemReader delega chamadas read() reais para uma instância inicializada preguiçosamente da classe IteratorItemReader :

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Um bean Spring para esta implementação é criado com as anotações @Component e @StepScope , informando ao Spring que esta classe é um componente Spring com escopo de etapa e será criado uma vez por execução de etapa da seguinte maneira:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Processadores personalizados

ItemProcessors transformam itens de entrada e introduzem lógica de negócios em um cenário de processamento orientado a itens. Eles devem fornecer uma implementação da interface org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

O método process() aceita uma instância da classe I e pode ou não retornar uma instância do mesmo tipo. Retornar null indica que o item não deve continuar a ser processado. Como de costume, o Spring fornece alguns processadores padrão, como CompositeItemProcessor que passa o item por uma sequência de ItemProcessor um ValidatingItemProcessor que valida a entrada.

No caso de nosso aplicativo de amostra, os processadores são usados ​​para filtrar clientes pelos seguintes requisitos:

  • Um cliente deve ter nascido no mês atual (por exemplo, para sinalizar promoções de aniversário, etc.)
  • Um cliente deve ter menos de cinco transações concluídas (por exemplo, para identificar clientes mais novos)

O requisito "mês atual" é implementado por meio de um ItemProcessor personalizado:

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

O requisito de “número limitado de transações” é implementado como ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Esse par de processadores é então encapsulado em um CompositeItemProcessor que implementa o padrão de delegado:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Gravadores personalizados

Para gerar os dados, o Spring Batch fornece a interface org.springframework.batch.item.ItemWriter para serializar objetos conforme necessário:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

O método write() é responsável por garantir que todos os buffers internos sejam liberados. Se uma transação estiver ativa, geralmente também será necessário descartar a saída em uma reversão subsequente. O recurso para o qual o gravador está enviando dados normalmente deve ser capaz de lidar com isso sozinho. Existem implementações padrão, como CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter e outras.

Em nosso aplicativo de exemplo, a lista de clientes filtrados é escrita da seguinte forma:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Agendando trabalhos em lote do Spring

Por padrão, o Spring Batch executa todos os trabalhos que pode encontrar (ou seja, que são configurados como em CustomerReportJobConfig ) na inicialização. Para alterar esse comportamento, desative a execução da tarefa na inicialização adicionando a seguinte propriedade a application.properties :

 spring.batch.job.enabled=false

O agendamento real é então obtido adicionando a anotação @EnableScheduling a uma classe de configuração e a anotação @Scheduled ao método que executa o próprio trabalho. O agendamento pode ser configurado com atraso, taxas ou expressões cron:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Há um problema com o exemplo acima embora. Em tempo de execução, o trabalho será bem-sucedido apenas na primeira vez. Quando ele for iniciado pela segunda vez (ou seja, após cinco segundos), ele gerará as seguintes mensagens nos logs (observe que em versões anteriores do Spring Batch uma JobInstanceAlreadyCompleteException teria sido lançada):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Isso acontece porque apenas JobInstance s únicos podem ser criados e executados e o Spring Batch não tem como distinguir entre o primeiro e o segundo JobInstance .

Há duas maneiras de evitar esse problema ao agendar um trabalho em lotes.

Uma é certificar-se de introduzir um ou mais parâmetros exclusivos (por exemplo, tempo de início real em nanossegundos) para cada trabalho:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Como alternativa, você pode iniciar o próximo trabalho em uma sequência de JobInstance s determinada pelo JobParametersIncrementer anexado ao trabalho especificado com SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Teste de unidade de lote de mola

Normalmente, para executar testes de unidade em um aplicativo Spring Boot, a estrutura deve carregar um ApplicationContext correspondente. Duas anotações são usadas para este propósito:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Há uma classe de utilitário org.springframework.batch.test.JobLauncherTestUtils para testar trabalhos em lote. Ele fornece métodos para iniciar um trabalho inteiro, além de permitir testes de ponta a ponta de etapas individuais sem ter que executar cada etapa do trabalho. Deve ser declarado como um bean Spring:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Um teste típico para um trabalho e uma etapa tem a seguinte aparência (e também pode usar qualquer estrutura de simulação):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

O Spring Batch apresenta escopos adicionais para contextos de etapa e trabalho. Os objetos nesses escopos usam o contêiner Spring como uma fábrica de objetos, portanto, há apenas uma instância de cada bean por etapa de execução ou tarefa. Além disso, é fornecido suporte para vinculação tardia de referências acessíveis a partir de StepContext ou JobContext . Os componentes que são configurados em tempo de execução para ter escopo de etapa ou tarefa são difíceis de testar como componentes autônomos, a menos que você tenha uma maneira de definir o contexto como se estivessem em uma etapa ou execução de tarefa. Esse é o objetivo dos componentes org.springframework.batch.test.StepScopeTestExecutionListener e org.springframework.batch.test.StepScopeTestUtils no Spring Batch, bem como JobScopeTestExecutionListener e JobScopeTestUtils .

Os TestExecutionListeners são declarados no nível de classe e seu trabalho é criar um contexto de execução de etapa para cada método de teste. Por exemplo:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Existem dois TestExecutionListener s. Um é da estrutura regular do Spring Test e lida com a injeção de dependência do contexto do aplicativo configurado. O outro é o Spring Batch StepScopeTestExecutionListener que configura o contexto do escopo da etapa para injeção de dependência em testes de unidade. Um StepContext é criado para a duração de um método de teste e disponibilizado para quaisquer dependências injetadas. O comportamento padrão é apenas criar um StepExecution com propriedades fixas. Alternativamente, o StepContext pode ser fornecido pelo caso de teste como um método de fábrica retornando o tipo correto.

Outra abordagem é baseada na classe de utilitário StepScopeTestUtils . Esta classe é usada para criar e manipular StepScope em testes de unidade de forma mais flexível sem usar injeção de dependência. Por exemplo, a leitura do ID do cliente filtrado pelo processador acima pode ser feito da seguinte forma:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Pronto para o Advanced Spring Batch?

Este artigo apresenta alguns dos conceitos básicos de design e desenvolvimento de aplicativos Spring Batch. No entanto, há muitos tópicos e recursos mais avançados, como dimensionamento, processamento paralelo, ouvintes e muito mais, que não são abordados neste artigo. Espero que este artigo forneça uma base útil para começar.

Informações sobre esses tópicos mais avançados podem ser encontradas na documentação oficial do Spring Back para o Spring Batch.