Tutorial de Spring Batch: Procesamiento por lotes simplificado con Spring

Publicado: 2022-03-11

El procesamiento por lotes, tipificado por una ejecución en segundo plano, orientada a granel, no interactiva y, con frecuencia, de larga duración, se usa ampliamente en prácticamente todas las industrias y se aplica a una amplia gama de tareas. El procesamiento por lotes puede ser intensivo en datos o computacionalmente, ejecutarse secuencialmente o en paralelo, y puede iniciarse a través de varios modelos de invocación, incluidos ad hoc, programados y bajo demanda.

Este tutorial de Spring Batch explica el modelo de programación y el lenguaje de dominio de las aplicaciones por lotes en general y, en particular, muestra algunos enfoques útiles para el diseño y desarrollo de aplicaciones por lotes utilizando la versión actual de Spring Batch 3.0.7 .

¿Qué es Spring Batch?

Spring Batch es un marco ligero y completo diseñado para facilitar el desarrollo de aplicaciones por lotes sólidas. También proporciona funciones y servicios técnicos más avanzados que admiten trabajos por lotes de alto rendimiento y volumen extremadamente alto a través de sus técnicas de optimización y partición. Spring Batch se basa en el enfoque de desarrollo basado en POJO de Spring Framework, familiar para todos los desarrolladores Spring experimentados.

A modo de ejemplo, este artículo considera el código fuente de un proyecto de muestra que carga un archivo de cliente con formato XML, filtra a los clientes por varios atributos y envía las entradas filtradas a un archivo de texto. El código fuente de nuestro ejemplo de Spring Batch (que utiliza anotaciones de Lombok) está disponible aquí en GitHub y requiere Java SE 8 y Maven.

¿Qué es el procesamiento por lotes? Conceptos clave y terminología

Es importante que cualquier desarrollador de lotes esté familiarizado y se sienta cómodo con los conceptos principales del procesamiento por lotes. El siguiente diagrama es una versión simplificada de la arquitectura de referencia por lotes que se ha probado durante décadas de implementaciones en muchas plataformas diferentes. Presenta los conceptos y términos clave relevantes para el procesamiento por lotes, tal como los utiliza Spring Batch.

Tutorial de Spring Batch: conceptos clave y terminología

Como se muestra en nuestro ejemplo de procesamiento por lotes, un proceso por lotes generalmente se encapsula mediante un Job que consta de varios Step . Cada Step normalmente tiene un solo ItemReader , ItemProcessor y ItemWriter . Un Job ejecuta un JobLauncher y los metadatos sobre los trabajos configurados y ejecutados se almacenan en un JobRepository .

Cada Job puede estar asociado con múltiples JobInstance s, cada uno de los cuales está definido de manera única por sus JobParameters particulares que se usan para iniciar un trabajo por lotes. Cada ejecución de JobInstance se conoce como JobExecution . Cada JobExecution generalmente realiza un seguimiento de lo que sucedió durante una ejecución, como los estados actual y de salida, las horas de inicio y finalización, etc.

Un Step es una fase independiente y específica de un Job por lotes, de modo que cada Job se compone de uno o más Step . Similar a un Job , un Step tiene una StepExecution de Paso individual que representa un único intento de ejecutar un Step . StepExecution almacena la información sobre los estados actual y de salida, las horas de inicio y finalización, etc., así como las referencias a sus instancias correspondientes de Step y JobExecution .

Un ExecutionContext es un conjunto de pares clave-valor que contienen información cuyo ámbito es StepExecution o JobExecution . Spring Batch conserva el ExecutionContext , que ayuda en los casos en los que desea reiniciar una ejecución por lotes (por ejemplo, cuando se produce un error fatal, etc.). Todo lo que se necesita es poner cualquier objeto que se compartirá entre los pasos en el contexto y el marco se encargará del resto. Después del reinicio, los valores del ExecutionContext anterior se restauran desde la base de datos y se aplican.

JobRepository es el mecanismo en Spring Batch que hace posible toda esta persistencia. Proporciona operaciones CRUD para instancias de JobLauncher , Job y Step . Una vez que se inicia un Job , se obtiene un JobExecution del repositorio y, durante el curso de la ejecución, las instancias de StepExecution y JobExecution se conservan en el repositorio.

Introducción a Spring Batch Framework

Una de las ventajas de Spring Batch es que las dependencias del proyecto son mínimas, lo que facilita la puesta en marcha y el funcionamiento rápido. Las pocas dependencias que existen están claramente especificadas y explicadas en el pom.xml del proyecto, al que se puede acceder aquí.

El inicio real de la aplicación ocurre en una clase que se parece a lo siguiente:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

La anotación @EnableBatchProcessing habilita las características de Spring Batch y proporciona una configuración básica para configurar trabajos por lotes.

La anotación @SpringBootApplication proviene del proyecto Spring Boot que proporciona aplicaciones independientes, listas para producción y basadas en Spring. Especifica una clase de configuración que declara uno o más beans de Spring y también activa la configuración automática y el escaneo de componentes de Spring.

Nuestro proyecto de muestra solo tiene un trabajo configurado por CustomerReportJobConfig con JobBuilderFactory y StepBuilderFactory inyectados. La configuración mínima del trabajo se puede definir en CustomerReportJobConfig de la siguiente manera:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Hay dos enfoques principales para construir un paso.

Un enfoque, como se muestra en el ejemplo anterior, está basado en tareas . Un Tasklet admite una interfaz simple que tiene solo un método, execute() , al que se llama repetidamente hasta que devuelve RepeatStatus.FINISHED o genera una excepción para señalar una falla. Cada llamada al Tasklet está envuelta en una transacción.

Otro enfoque, el procesamiento orientado a fragmentos , se refiere a leer los datos secuencialmente y crear "fragmentos" que se escribirán dentro de un límite de transacción. Cada elemento individual se lee desde un ItemReader , se entrega a un ItemProcessor y se agrega. Una vez que la cantidad de elementos leídos es igual al intervalo de confirmación, el fragmento completo se escribe a través de ItemWriter y luego se confirma la transacción. Un paso orientado a fragmentos se puede configurar de la siguiente manera:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

El método chunk() crea un paso que procesa elementos en fragmentos con el tamaño proporcionado, y luego cada fragmento se pasa al lector, procesador y escritor especificado. Estos métodos se analizan con más detalle en las siguientes secciones de este artículo.

Lector personalizado

Para nuestra aplicación de muestra Spring Batch, para leer una lista de clientes de un archivo XML, debemos proporcionar una implementación de la interfaz org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

Un ItemReader proporciona los datos y se espera que tenga estado. Por lo general, se llama varias veces para cada lote, y cada llamada a read() devuelve el siguiente valor y finalmente devuelve null cuando se han agotado todos los datos de entrada.

Spring Batch proporciona algunas implementaciones listas para usar de ItemReader , que se pueden usar para una variedad de propósitos, como leer colecciones, archivos, integrar JMS y JDBC, así como múltiples fuentes, etc.

En nuestra aplicación de muestra, la clase CustomerItemReader delega las llamadas read() reales a una instancia inicializada de forma diferida de la clase IteratorItemReader :

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Se crea un bean de Spring para esta implementación con las anotaciones @Component y @StepScope , lo que le permite a Spring saber que esta clase es un componente de Spring con ámbito de paso y se creará una vez por ejecución de paso de la siguiente manera:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Procesadores personalizados

ItemProcessors transforma los elementos de entrada e introduce la lógica empresarial en un escenario de procesamiento orientado a elementos. Deben proporcionar una implementación de la interfaz org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

El método process() acepta una instancia de la clase I y puede o no devolver una instancia del mismo tipo. Devolver null indica que el elemento no debe continuar procesándose. Como de costumbre, Spring proporciona pocos procesadores estándar, como CompositeItemProcessor que pasa el elemento a través de una secuencia de ItemProcessor inyectados y un ValidatingItemProcessor que valida la entrada.

En el caso de nuestra aplicación de muestra, los procesadores se utilizan para filtrar a los clientes según los siguientes requisitos:

  • Un cliente debe haber nacido en el mes actual (p. ej., para marcar especiales de cumpleaños, etc.)
  • Un cliente debe tener menos de cinco transacciones completadas (por ejemplo, para identificar clientes más nuevos)

El requisito del "mes actual" se implementa a través de un ItemProcessor personalizado:

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

El requisito de "número limitado de transacciones" se implementa como ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Este par de procesadores luego se encapsula dentro de un CompositeItemProcessor que implementa el patrón de delegado:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Escritores personalizados

Para generar los datos, Spring Batch proporciona la interfaz org.springframework.batch.item.ItemWriter para serializar objetos según sea necesario:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

El método write() es responsable de asegurarse de que se vacíen los búferes internos. Si una transacción está activa, normalmente también será necesario descartar la salida en una reversión posterior. El recurso al que el escritor está enviando datos normalmente debería poder manejar esto por sí mismo. Hay implementaciones estándar como CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter y otras.

En nuestra aplicación de muestra, la lista de clientes filtrados se escribe de la siguiente manera:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Programación de trabajos por lotes de Spring

De forma predeterminada, Spring Batch ejecuta todos los trabajos que puede encontrar (es decir, que están configurados como en CustomerReportJobConfig ) al inicio. Para cambiar este comportamiento, deshabilite la ejecución del trabajo al inicio agregando la siguiente propiedad a application.properties :

 spring.batch.job.enabled=false

Luego, la programación real se logra agregando la anotación @EnableScheduling a una clase de configuración y la anotación @Scheduled al método que ejecuta el trabajo en sí. La programación se puede configurar con expresiones de retraso, tasas o cron:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Sin embargo, hay un problema con el ejemplo anterior. En tiempo de ejecución, el trabajo solo se realizará correctamente la primera vez. Cuando se inicie por segunda vez (es decir, después de cinco segundos), generará los siguientes mensajes en los registros (tenga en cuenta que en versiones anteriores de Spring Batch se habría lanzado una JobInstanceAlreadyCompleteException ):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Esto sucede porque solo se pueden crear y ejecutar JobInstance únicos y Spring Batch no tiene forma de distinguir entre el primer y el segundo JobInstance .

Hay dos formas de evitar este problema cuando programa un trabajo por lotes.

Uno es asegurarse de introducir uno o más parámetros únicos (por ejemplo, tiempo de inicio real en nanosegundos) para cada trabajo:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Alternativamente, puede iniciar el siguiente trabajo en una secuencia de JobInstance s determinada por JobParametersIncrementer adjunto al trabajo especificado con SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Pruebas unitarias por lotes de primavera

Por lo general, para ejecutar pruebas unitarias en una aplicación Spring Boot, el marco debe cargar un ApplicationContext correspondiente. Se utilizan dos anotaciones para este propósito:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Hay una clase de utilidad org.springframework.batch.test.JobLauncherTestUtils para probar trabajos por lotes. Proporciona métodos para iniciar un trabajo completo y permite realizar pruebas de extremo a extremo de pasos individuales sin tener que ejecutar cada paso del trabajo. Debe declararse como Spring bean:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Una prueba típica para un trabajo y un paso tiene el siguiente aspecto (y también puede usar cualquier marco de simulación):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch introduce alcances adicionales para contextos de pasos y trabajos. Los objetos en estos ámbitos usan el contenedor Spring como una fábrica de objetos, por lo que solo hay una instancia de cada bean por paso de ejecución o trabajo. Además, se proporciona soporte para el enlace tardío de referencias accesibles desde StepContext o JobContext . Los componentes que se configuran en tiempo de ejecución para tener un alcance de paso o de trabajo son difíciles de probar como componentes independientes, a menos que tenga una forma de establecer el contexto como si estuvieran en un paso o en la ejecución de un trabajo. Ese es el objetivo de los componentes JobScopeTestUtils y org.springframework.batch.test.StepScopeTestUtils en Spring Batch, así como org.springframework.batch.test.StepScopeTestExecutionListener y JobScopeTestExecutionListener .

Los TestExecutionListeners se declaran a nivel de clase y su trabajo es crear un contexto de ejecución de pasos para cada método de prueba. Por ejemplo:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Hay dos TestExecutionListener s. Uno es del marco normal de Spring Test y maneja la inyección de dependencia desde el contexto de la aplicación configurada. El otro es Spring Batch StepScopeTestExecutionListener que configura el contexto de alcance escalonado para la inyección de dependencia en las pruebas unitarias. Se crea un StepContext para la duración de un método de prueba y se pone a disposición de cualquier dependencia que se inyecte. El comportamiento predeterminado es simplemente crear un StepExecution con propiedades fijas. Alternativamente, el StepContext puede ser proporcionado por el caso de prueba como un método de fábrica que devuelve el tipo correcto.

Otro enfoque se basa en la clase de utilidad StepScopeTestUtils . Esta clase se usa para crear y manipular StepScope en pruebas unitarias de una manera más flexible sin usar inyección de dependencia. Por ejemplo, la lectura del ID del cliente filtrado por el procesador anterior podría hacerse de la siguiente manera:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

¿Listo para el lote avanzado de primavera?

Este artículo presenta algunos de los conceptos básicos de diseño y desarrollo de aplicaciones Spring Batch. Sin embargo, hay muchos temas y capacidades más avanzados, como el escalado, el procesamiento en paralelo, los oyentes y más, que no se abordan en este artículo. Con suerte, este artículo proporciona una base útil para comenzar.

La información sobre estos temas más avanzados se puede encontrar en la documentación oficial de Spring Back para Spring Batch.