Учебное пособие по Spring Batch: пакетная обработка с помощью Spring стала проще

Опубликовано: 2022-03-11

Пакетная обработка, характеризующаяся массовым, неинтерактивным и часто длительным фоновым выполнением, широко используется практически во всех отраслях и применяется для решения широкого круга задач. Пакетная обработка может требовать больших объемов данных или вычислений, выполняться последовательно или параллельно и может быть инициирована с помощью различных моделей вызова, включая специальные, запланированные и по требованию.

В этом учебном пособии по Spring Batch объясняется модель программирования и предметный язык пакетных приложений в целом и, в частности, показаны некоторые полезные подходы к проектированию и разработке пакетных приложений с использованием текущей версии Spring Batch 3.0.7 .

Что такое весенняя партия?

Spring Batch — это легкая комплексная платформа, предназначенная для облегчения разработки надежных пакетных приложений. Он также предоставляет более продвинутые технические услуги и функции, которые поддерживают чрезвычайно большие объемы и высокопроизводительные пакетные задания благодаря своим методам оптимизации и разделения. Spring Batch основывается на подходе Spring Framework к разработке на основе POJO, знакомом всем опытным разработчикам Spring.

В качестве примера в этой статье рассматривается исходный код примера проекта, который загружает файл клиентов в формате XML, фильтрует клиентов по различным атрибутам и выводит отфильтрованные записи в текстовый файл. Исходный код нашего примера Spring Batch (в котором используются аннотации Lombok) доступен здесь, на GitHub, и для него требуются Java SE 8 и Maven.

Что такое пакетная обработка? Ключевые понятия и терминология

Для любого пакетного разработчика важно быть знакомым с основными концепциями пакетной обработки. На приведенной ниже диаграмме представлена ​​упрощенная версия эталонной пакетной архитектуры, проверенной десятилетиями реализации на многих различных платформах. Он знакомит с ключевыми понятиями и терминами, относящимися к пакетной обработке, используемой Spring Batch.

Учебное пособие по Spring Batch: ключевые понятия и терминология

Как показано в нашем примере пакетной обработки, пакетный процесс обычно инкапсулируется Job , состоящим из нескольких Step . Каждый Step обычно имеет один ItemReader , ItemProcessor и ItemWriter . Job выполняется с помощью JobLauncher , а метаданные о сконфигурированных и выполненных заданиях хранятся в JobRepository .

Каждое Job может быть связано с несколькими JobInstance , каждый из которых определяется уникальными JobParameters , которые используются для запуска пакетного задания. Каждый запуск JobInstance называется JobExecution . Каждый JobExecution обычно отслеживает, что произошло во время выполнения, например текущий статус и статус выхода, время начала и окончания и т. д.

Step — это независимая, определенная фаза пакетного Job , так что каждое Job состоит из одного или нескольких Step . Как и в случае с Job , у Step есть отдельная StepExecution , которая представляет собой одну попытку выполнить Step . StepExecution хранит информацию о текущем статусе и статусе выхода, времени начала и окончания и т. д., а также ссылки на соответствующие экземпляры Step и JobExecution .

ExecutionContext — это набор пар ключ-значение, содержащих информацию, относящуюся либо к StepExecution , либо к JobExecution . Spring Batch сохраняет ExecutionContext , что помогает в случаях, когда вы хотите перезапустить пакетный запуск (например, когда произошла фатальная ошибка и т. д.). Все, что нужно, — это поместить любой объект, который будет использоваться совместно между шагами, в контекст, а фреймворк позаботится обо всем остальном. После перезапуска значения из предыдущего ExecutionContext восстанавливаются из базы данных и применяются.

JobRepository — это механизм в Spring Batch, который делает возможным все это постоянство. Он предоставляет операции CRUD для создания экземпляров JobLauncher , Job и Step . После запуска Job из репозитория извлекается экземпляр JobExecution , и во время выполнения StepExecution и JobExecution сохраняются в репозитории.

Начало работы с Spring Batch Framework

Одним из преимуществ Spring Batch является то, что зависимости проекта минимальны, что облегчает быструю настройку и запуск. Несколько существующих зависимостей четко указаны и объяснены в pom.xml проекта, доступ к которому можно получить здесь.

Фактический запуск приложения происходит в классе, который выглядит примерно так:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

Аннотация @EnableBatchProcessing включает функции Spring Batch и предоставляет базовую конфигурацию для настройки пакетных заданий.

Аннотация @SpringBootApplication исходит из проекта Spring Boot, который предоставляет автономные, готовые к работе приложения на основе Spring. Он определяет класс конфигурации, который объявляет один или несколько компонентов Spring, а также запускает автоматическую настройку и сканирование компонентов Spring.

В нашем примере проекта есть только одно задание, настроенное CustomerReportJobConfig с введенными JobBuilderFactory и StepBuilderFactory . Минимальную конфигурацию задания можно определить в CustomerReportJobConfig следующим образом:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Существует два основных подхода к построению ступени.

Один из подходов, как показано в приведенном выше примере, основан на тасклетах . Tasklet поддерживает простой интерфейс, который имеет только один метод, execute() , который вызывается многократно, пока он либо не вернет RepeatStatus.FINISHED , либо не выдаст исключение, сигнализирующее об ошибке. Каждый вызов Tasklet заключен в транзакцию.

Другой подход, обработка по частям, относится к последовательному чтению данных и созданию «кусков», которые будут записываться в пределах границ транзакции. Каждый отдельный элемент считывается из ItemReader , передается в ItemProcessor и агрегируется. Как только количество прочитанных элементов сравняется с интервалом фиксации, весь фрагмент записывается через ItemWriter , а затем транзакция фиксируется. Шаг, ориентированный на фрагменты, можно настроить следующим образом:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

Метод chunk() строит шаг, который обрабатывает элементы порциями заданного размера, при этом каждая порция затем передается указанному читателю, процессору и записывающему устройству. Эти методы более подробно обсуждаются в следующих разделах этой статьи.

Пользовательский считыватель

Для нашего примера приложения Spring Batch, чтобы прочитать список клиентов из файла XML, нам нужно предоставить реализацию интерфейса org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

ItemReader предоставляет данные и, как ожидается, будет иметь состояние. Обычно он вызывается несколько раз для каждого пакета, при этом каждый вызов read() возвращает следующее значение и, наконец, возвращает null , когда все входные данные исчерпаны.

Spring Batch предоставляет несколько готовых реализаций ItemReader , которые можно использовать для различных целей, таких как чтение коллекций, файлов, интеграция JMS и JDBC, а также нескольких источников и т. д.

В нашем примере приложения класс CustomerItemReader делегирует фактические вызовы read() лениво инициализированному экземпляру класса IteratorItemReader :

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Компонент Spring для этой реализации создается с аннотациями @Component и @StepScope , сообщая Spring, что этот класс является компонентом Spring с пошаговой областью действия и будет создаваться один раз за выполнение шага следующим образом:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Пользовательские процессоры

ItemProcessors преобразуют входные элементы и вводят бизнес-логику в сценарий обработки, ориентированный на элементы. Они должны предоставить реализацию интерфейса org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

Метод process() принимает один экземпляр класса I и может возвращать или не возвращать экземпляр того же типа. Возврат null указывает, что элемент не должен продолжать обрабатываться. Как обычно, Spring предоставляет несколько стандартных процессоров, таких как CompositeItemProcessor , который пропускает элемент через последовательность ItemProcessor s, и ValidatingItemProcessor , который проверяет входные данные.

В случае нашего примера приложения процессоры используются для фильтрации клиентов по следующим требованиям:

  • Клиент должен родиться в текущем месяце (например, чтобы отметить специальные предложения ко дню рождения и т. д.)
  • Клиент должен иметь менее пяти завершенных транзакций (например, для идентификации новых клиентов).

Требование «текущий месяц» реализуется через пользовательский ItemProcessor :

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

Требование «ограниченное количество транзакций» реализовано как ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Затем эта пара процессоров инкапсулируется в CompositeItemProcessor , который реализует шаблон делегата:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Пользовательские писатели

Для вывода данных Spring Batch предоставляет интерфейс org.springframework.batch.item.ItemWriter для сериализации объектов по мере необходимости:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

Метод write() отвечает за очистку всех внутренних буферов. Если транзакция активна, обычно также необходимо отбросить выходные данные при последующем откате. Ресурс, которому модуль записи отправляет данные, обычно должен быть в состоянии справиться с этим сам. Существуют стандартные реализации, такие как CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter и другие.

В нашем примере приложения список отфильтрованных клиентов выглядит следующим образом:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Планирование пакетных заданий Spring

По умолчанию Spring Batch выполняет все задания, которые он может найти (т. е. те, которые настроены как в CustomerReportJobConfig ) при запуске. Чтобы изменить это поведение, отключите выполнение задания при запуске, добавив в application.properties следующее свойство:

 spring.batch.job.enabled=false

Затем фактическое планирование достигается путем добавления аннотации @EnableScheduling к классу конфигурации и аннотации @Scheduled к методу, который выполняет само задание. Планирование можно настроить с помощью задержек, ставок или выражений cron:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Однако в приведенном выше примере есть проблема. Во время выполнения задание будет успешным только в первый раз. Когда он запускается во второй раз (то есть через пять секунд), он будет генерировать следующие сообщения в журналах (обратите внимание, что в предыдущих версиях Spring Batch было бы выброшено JobInstanceAlreadyCompleteException ):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Это происходит из-за того, что могут создаваться и выполняться только уникальные JobInstance , а Spring Batch не имеет возможности различить первый и второй JobInstance .

Существует два способа избежать этой проблемы при планировании пакетного задания.

Один из них заключается в том, чтобы ввести один или несколько уникальных параметров (например, фактическое время начала в наносекундах) для каждого задания:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Кроме того, вы можете запустить следующее задание в последовательности JobInstance s, определенной JobParametersIncrementer , прикрепленным к указанному заданию с помощью SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Весеннее пакетное модульное тестирование

Обычно для запуска модульных тестов в приложении Spring Boot фреймворк должен загрузить соответствующий ApplicationContext . Для этого используются две аннотации:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Существует служебный класс org.springframework.batch.test.JobLauncherTestUtils для тестирования пакетных заданий. Он предоставляет методы для запуска всего задания, а также позволяет сквозное тестирование отдельных шагов без необходимости запускать каждый шаг в задании. Он должен быть объявлен как Spring bean:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Типичный тест для работы и шага выглядит следующим образом (и может использовать любые mock-фреймворки):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch вводит дополнительные области действия для контекстов шагов и заданий. Объекты в этих областях используют контейнер Spring в качестве фабрики объектов, поэтому существует только один экземпляр каждого такого компонента на шаг выполнения или задание. Кроме того, поддерживается поздняя привязка ссылок, доступных из StepContext или JobContext . Компоненты, настроенные во время выполнения для работы в рамках шага или задания, сложно протестировать как автономные компоненты, если только у вас нет способа установить контекст, как если бы они выполнялись в шаге или задании. Это цель компонентов org.springframework.batch.test.StepScopeTestExecutionListener и org.springframework.batch.test.StepScopeTestUtils в Spring Batch, а также JobScopeTestExecutionListener и JobScopeTestUtils .

TestExecutionListeners объявляются на уровне класса, и их задача заключается в создании контекста выполнения шага для каждого метода теста. Например:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Есть два TestExecutionListener s. Один из обычных фреймворков Spring Test обрабатывает внедрение зависимостей из настроенного контекста приложения. Другой — Spring Batch StepScopeTestExecutionListener , который устанавливает контекст пошаговой области для внедрения зависимостей в модульные тесты. StepContext создается на время выполнения тестового метода и становится доступным для любых внедренных зависимостей. Поведение по умолчанию — просто создать StepExecution с фиксированными свойствами. В качестве альтернативы StepContext может быть предоставлен тестовым набором в качестве фабричного метода, возвращающего правильный тип.

Другой подход основан на служебном классе StepScopeTestUtils . Этот класс используется для создания и управления StepScope в модульных тестах более гибким способом без использования внедрения зависимостей. Например, чтение идентификатора клиента, отфильтрованного процессором выше, может быть выполнено следующим образом:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Готовы к Advanced Spring Batch?

В этой статье представлены некоторые основы проектирования и разработки приложений Spring Batch. Однако есть много более сложных тем и возможностей, таких как масштабирование, параллельная обработка, прослушиватели и т. д., которые не рассматриваются в этой статье. Надеюсь, эта статья станет полезной основой для начала работы.

Информацию по этим более сложным темам можно найти в официальной документации Spring Back для Spring Batch.