Spring Batch 教程:使用 Spring 轻松进行批处理

已发表: 2022-03-11

批处理——以面向批量的、非交互式的、经常长时间运行的后台执行为代表——几乎在每个行业中都广泛使用,并应用于各种任务。 批处理可以是数据密集型或计算密集型,按顺序或并行执行,并且可以通过各种调用模型(包括自组织、调度和按需)启动。

本 Spring Batch 教程总体上解释了批处理应用程序的编程模型和领域语言,特别是展示了使用当前Spring Batch 3.0.7版本设计和开发批处理应用程序的一些有用方法。

什么是春季批次?

Spring Batch 是一个轻量级的综合框架,旨在促进健壮的批处理应用程序的开发。 它还提供更先进的技术服务和功能,通过其优化和分区技术支持极高容量和高性能的批处理作业。 Spring Batch 建立在 Spring Framework 的基于 POJO 的开发方法之上,所有有经验的 Spring 开发人员都熟悉这种方法。

作为示例,本文考虑了一个示例项目的源代码,该项目加载一个 XML 格式的客户文件,按各种属性过滤客户,并将过滤后的条目输出到一个文本文件。 我们的 Spring Batch 示例(使用 Lombok 注释)的源代码可在 GitHub 上获得,并且需要 Java SE 8 和 Maven。

什么是批处理? 关键概念和术语

对于任何批处理开发人员来说,熟悉和熟悉批处理的主要概念非常重要。 下图是批处理参考架构的简化版本,经过数十年在许多不同平台上的实施证明。 它介绍了 Spring Batch 使用的与批处理相关的关键概念和术语。

Spring Batch 教程:关键概念和术语

如我们的批处理示例所示,批处理通常由包含多个StepJob封装。 每个Step通常有一个ItemReaderItemProcessorItemWriterJobJobLauncher执行,有关已配置和执行的作业的元数据存储在JobRepository中。

每个Job可能与多个JobInstance相关联,每个 JobInstance 由用于启动批处理作业的特定JobParameters唯一定义。 JobExecution的每次运行都称为JobInstance 。 每个JobExecution通常会跟踪运行期间发生的情况,例如当前和退出状态、开始和结束时间等。

Step是批处理Job的一个独立的特定阶段,因此每个Job都由一个或多个Step组成。 与Job类似, Step具有单独的StepExecution ,表示执行Step的单次尝试。 StepExecution存储有关当前和退出状态、开始和结束时间等信息,以及对其对应StepJobExecution实例的引用。

ExecutionContext是一组键值对,其中包含范围为StepExecutionJobExecution的信息。 Spring Batch 持久化ExecutionContext ,这在您想要重新启动批处理运行的情况下(例如,当发生致命错误时等)时会有所帮助。 所需要的只是将要在步骤之间共享的任何对象放入上下文中,框架将负责其余的工作。 重新启动后,先前ExecutionContext中的值将从数据库中恢复并应用。

JobRepository是 Spring Batch 中使所有这些持久性成为可能的机制。 它为JobLauncherJobStep实例提供 CRUD 操作。 启动Job后,会从存储库中获取JobExecution ,并且在执行过程中, StepExecutionJobExecution实例将持久保存到存储库。

Spring Batch 框架入门

Spring Batch 的优点之一是项目依赖项最小,这使得更容易快速启动和运行。 确实存在的少数依赖项在项目的pom.xml中明确指定和解释,可以在此处访问。

应用程序的实际启动发生在如下所示的类中:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

@EnableBatchProcessing注解启用 Spring Batch 功能并提供用于设置批处理作业的基本配置。

@SpringBootApplication注解来自提供独立、生产就绪、基于 Spring 的应用程序的 Spring Boot 项目。 它指定一个配置类,该类声明一个或多个 Spring bean,并触发自动配置和 Spring 的组件扫描。

我们的示例项目只有一个由CustomerReportJobConfig配置的作业,其中注入了JobBuilderFactoryStepBuilderFactory 。 最小作业配置可以在CustomerReportJobConfig中定义如下:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

构建步骤有两种主要方法。

如上例所示,一种方法是基于 tasklet 的Tasklet支持一个简单的接口,它只有一个方法execute() ,该方法被重复调用,直到它返回RepeatStatus.FINISHED或抛出异常以表示失败。 对Tasklet的每次调用都包装在一个事务中。

另一种方法,面向块的处理,是指顺序读取数据并创建将在事务边界内写出的“块”。 每个单独的项目都从ItemReader读入,交给ItemProcessor并聚合。 一旦读取的项目数等于提交间隔,就会通过ItemWriter写出整个块,然后提交事务。 面向块的步骤可以配置如下:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

chunk()方法构建了一个步骤,以提供的大小处理块中的项目,然后将每个块传递给指定的读取器、处理器和写入器。 这些方法将在本文的下一部分中更详细地讨论。

自定义阅读器

对于我们的 Spring Batch 示例应用程序,为了从 XML 文件中读取客户列表,我们需要提供接口org.springframework.batch.item.ItemReader的实现:

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

ItemReader提供数据并且应该是有状态的。 它通常为每个批次调用多次,每次调用read()返回下一个值,最后在所有输入数据用完时返回null

Spring Batch 提供了一些开箱即用的ItemReader实现,可用于读取集合、文件、集成 JMS 和 JDBC 以及多个源等多种用途。

在我们的示例应用程序中, CustomerItemReader类将实际的read()调用委托给IteratorItemReader类的延迟初始化实例:

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

此实现的 Spring bean 使用@Component@StepScope注释创建,让 Spring 知道该类是一个步进范围的 Spring 组件,并且将在每个步骤执行时创建一次,如下所示:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

定制处理器

ItemProcessors转换输入项目并在面向项目的处理场景中引入业务逻辑。 他们必须提供接口org.springframework.batch.item.ItemProcessor的实现:

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

方法process()接受I类的一个实例,并且可能会或可能不会返回相同类型的实例。 返回null表示不应继续处理该项目。 像往常一样,Spring 提供了很少的标准处理器,例如CompositeItemProcessor ,它通过一系列注入的ItemProcessor和一个验证输入的ValidatingItemProcessor传递项目。

在我们的示例应用程序中,处理器用于按以下要求过滤客户:

  • 客户必须在当月出生(例如,标记生日特价等)
  • 客户必须完成少于五次的交易(例如,识别新客户)

“当前月份”要求是通过自定义ItemProcessor实现的:

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

“交易数量有限”的要求被实现为ValidatingItemProcessor

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

然后将这对处理器封装在实现委托模式的CompositeItemProcessor中:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

自定义作家

为了输出数据,Spring Batch 提供接口org.springframework.batch.item.ItemWriter用于根据需要序列化对象:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

write()方法负责确保刷新任何内部缓冲区。 如果事务处于活动状态,通常还需要在后续回滚时丢弃输出。 写入器向其发送数据的资源通常应该能够自行处理。 有标准实现,例如CompositeItemWriterJdbcBatchItemWriterJmsItemWriterJpaItemWriterSimpleMailMessageItemWriter等。

在我们的示例应用程序中,过滤后的客户列表如下所示:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

调度 Spring Batch 作业

默认情况下,Spring Batch 在启动时执行它可以找到的所有作业(即在CustomerReportJobConfig中配置的作业)。 要更改此行为,请在启动时通过将以下属性添加到application.properties来禁用作业执行:

 spring.batch.job.enabled=false

然后通过将@EnableScheduling注释添加到配置类并将@Scheduled注释添加到执行作业本身的方法来实现实际调度。 可以使用延迟、速率或 cron 表达式来配​​置调度:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

上面的例子有一个问题。 在运行时,作业只会在第一次成功。 当它第二次启动时(即 5 秒后),它将在日志中生成以下消息(请注意,在 Spring Batch 的早期版本中会抛出JobInstanceAlreadyCompleteException ):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

发生这种情况是因为只能创建和执行唯一的JobInstance ,并且 Spring Batch 无法区分第一个和第二个JobInstance

安排批处理作业时,有两种方法可以避免此问题。

一是确保为每个作业引入一个或多个唯一参数(例如,以纳秒为单位的实际开始时间):

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

或者,您可以使用SimpleJobOperator.startNextInstance()附加到指定作业的JobInstance确定的JobParametersIncrementer序列中启动下一个作业:

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Spring Batch 单元测试

通常,要在 Spring Boot 应用程序中运行单元测试,框架必须加载相应的ApplicationContext 。 为此使用了两个注释:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

有一个实用程序类org.springframework.batch.test.JobLauncherTestUtils来测试批处理作业。 它提供了启动整个作业的方法,并允许对各个步骤进行端到端测试,而无需运行作业中的每个步骤。 它必须声明为 Spring bean:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

作业和步骤的典型测试如下所示(也可以使用任何模拟框架):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch 为步骤和作业上下文引入了额外的范围。 这些范围内的对象使用 Spring 容器作为对象工厂,因此每个执行步骤或作业只有一个此类 bean 的实例。 此外,还支持从StepContextJobContext访问的引用的后期绑定。 在运行时配置为步进或作业范围的组件很难作为独立组件进行测试,除非您有办法将上下文设置为就好像它们在步骤或作业执行中一样。 这就是 Spring Batch 中的org.springframework.batch.test.StepScopeTestExecutionListenerorg.springframework.batch.test.StepScopeTestUtils组件以及JobScopeTestExecutionListenerJobScopeTestUtils的目标。

TestExecutionListeners在类级别声明,它的工作是为每个测试方法创建一个步骤执行上下文。 例如:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

有两个TestExecutionListener 。 一种来自常规的 Spring Test 框架,并处理来自配置的应用程序上下文的依赖注入。 另一个是 Spring Batch StepScopeTestExecutionListener ,它为单元测试中的依赖注入设置步进范围上下文。 StepContext在测试方法的持续时间内创建,并可供任何注入的依赖项使用。 默认行为只是创建具有固定属性的StepExecution 。 或者,测试用例可以提供StepContext作为返回正确类型的工厂方法。

另一种方法是基于StepScopeTestUtils实用程序类。 此类用于在单元测试中以更灵活的方式创建和操作StepScope ,而无需使用依赖注入。 例如,读取上面处理器过滤的客户 ID 可以如下完成:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

准备好使用高级 Spring Batch 了吗?

本文介绍了 Spring Batch 应用程序设计和开发的一些基础知识。 但是,还有许多更高级的主题和功能——例如缩放、并行处理、侦听器等——本文没有涉及。 希望本文为入门提供了有用的基础。

然后可以在 Spring Batch 的官方 Spring Back 文档中找到有关这些更高级主题的信息。