Spring Batch 教程:使用 Spring 輕鬆進行批處理

已發表: 2022-03-11

批處理——以面向批量的、非交互式的、經常長時間運行的後台執行為代表——幾乎在每個行業中都廣泛使用,並應用於各種任務。 批處理可以是數據密集型或計算密集型,按順序或併行執行,並且可以通過各種調用模型(包括自組織、調度和按需)啟動。

本 Spring Batch 教程總體上解釋了批處理應用程序的編程模型和領域語言,特別是展示了使用當前Spring Batch 3.0.7版本設計和開發批處理應用程序的一些有用方法。

什麼是春季批次?

Spring Batch 是一個輕量級的綜合框架,旨在促進健壯的批處理應用程序的開發。 它還提供更先進的技術服務和功能,通過其優化和分區技術支持極高容量和高性能的批處理作業。 Spring Batch 建立在 Spring Framework 的基於 POJO 的開發方法之上,所有有經驗的 Spring 開發人員都熟悉這種方法。

作為示例,本文考慮了一個示例項目的源代碼,該項目加載一個 XML 格式的客戶文件,按各種屬性過濾客戶,並將過濾後的條目輸出到一個文本文件。 我們的 Spring Batch 示例(使用 Lombok 註釋)的源代碼可在 GitHub 上獲得,並且需要 Java SE 8 和 Maven。

什麼是批處理? 關鍵概念和術語

對於任何批處理開發人員來說,熟悉和熟悉批處理的主要概念非常重要。 下圖是批處理參考架構的簡化版本,經過數十年在許多不同平台上的實施證明。 它介紹了 Spring Batch 使用的與批處理相關的關鍵概念和術語。

Spring Batch 教程:關鍵概念和術語

如我們的批處理示例所示,批處理通常由包含多個StepJob封裝。 每個Step通常有一個ItemReaderItemProcessorItemWriterJobJobLauncher執行,有關已配置和執行的作業的元數據存儲在JobRepository中。

每個Job可能與多個JobInstance相關聯,每個 JobInstance 由用於啟動批處理作業的特定JobParameters唯一定義。 JobExecution的每次運行都稱為JobInstance 。 每個JobExecution通常會跟踪運行期間發生的情況,例如當前和退出狀態、開始和結束時間等。

Step是批處理Job的一個獨立的特定階段,因此每個Job都由一個或多個Step組成。 與Job類似, Step具有單獨的StepExecution ,表示執行Step的單次嘗試。 StepExecution存儲有關當前和退出狀態、開始和結束時間等信息,以及對其對應StepJobExecution實例的引用。

ExecutionContext是一組鍵值對,其中包含範圍為StepExecutionJobExecution的信息。 Spring Batch 持久化ExecutionContext ,這在您想要重新啟動批處理運行的情況下(例如,當發生致命錯誤時等)時會有所幫助。 所需要的只是將要在步驟之間共享的任何對象放入上下文中,框架將負責其餘的工作。 重新啟動後,先前ExecutionContext中的值將從數據庫中恢復並應用。

JobRepository是 Spring Batch 中使所有這些持久性成為可能的機制。 它為JobLauncherJobStep實例提供 CRUD 操作。 啟動Job後,會從存儲庫中獲取JobExecution ,並且在執行過程中, StepExecutionJobExecution實例將持久保存到存儲庫。

Spring Batch 框架入門

Spring Batch 的優點之一是項目依賴項最小,這使得更容易快速啟動和運行。 確實存在的少數依賴項在項目的pom.xml中明確指定和解釋,可以在此處訪問。

應用程序的實際啟動發生在如下所示的類中:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

@EnableBatchProcessing註解啟用 Spring Batch 功能並提供用於設置批處理作業的基本配置。

@SpringBootApplication註解來自提供獨立、生產就緒、基於 Spring 的應用程序的 Spring Boot 項目。 它指定一個配置類,該類聲明一個或多個 Spring bean,並觸發自動配置和 Spring 的組件掃描。

我們的示例項目只有一個由CustomerReportJobConfig配置的作業,其中註入了JobBuilderFactoryStepBuilderFactory 。 最小作業配置可以在CustomerReportJobConfig中定義如下:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

構建步驟有兩種主要方法。

如上例所示,一種方法是基於 tasklet 的Tasklet支持一個簡單的接口,它只有一個方法execute() ,該方法被重複調用,直到它返回RepeatStatus.FINISHED或拋出異常以表示失敗。 對Tasklet的每次調用都包裝在一個事務中。

另一種方法,面向塊的處理,是指順序讀取數據並創建將在事務邊界內寫出的“塊”。 每個單獨的項目都從ItemReader讀入,交給ItemProcessor並聚合。 一旦讀取的項目數等於提交間隔,就會通過ItemWriter寫出整個塊,然後提交事務。 面向塊的步驟可以配置如下:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

chunk()方法構建了一個步驟,以提供的大小處理塊中的項目,然後將每個塊傳遞給指定的讀取器、處理器和寫入器。 這些方法將在本文的下一部分中更詳細地討論。

自定義閱讀器

對於我們的 Spring Batch 示例應用程序,為了從 XML 文件中讀取客戶列表,我們需要提供接口org.springframework.batch.item.ItemReader的實現:

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

ItemReader提供數據並且應該是有狀態的。 它通常為每個批次調用多次,每次調用read()返回下一個值,最後在所有輸入數據用完時返回null

Spring Batch 提供了一些開箱即用的ItemReader實現,可用於讀取集合、文件、集成 JMS 和 JDBC 以及多個源等多種用途。

在我們的示例應用程序中, CustomerItemReader類將實際的read()調用委託給IteratorItemReader類的延遲初始化實例:

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

此實現的 Spring bean 使用@Component@StepScope註釋創建,讓 Spring 知道該類是一個步進範圍的 Spring 組件,並且將在每個步驟執行時創建一次,如下所示:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

定制處理器

ItemProcessors轉換輸入項目並在面向項目的處理場景中引入業務邏輯。 他們必須提供接口org.springframework.batch.item.ItemProcessor的實現:

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

方法process()接受I類的一個實例,並且可能會或可能不會返回相同類型的實例。 返回null表示不應繼續處理該項目。 像往常一樣,Spring 提供了很少的標準處理器,例如CompositeItemProcessor ,它通過一系列注入的ItemProcessor和一個驗證輸入的ValidatingItemProcessor傳遞項目。

在我們的示例應用程序中,處理器用於按以下要求過濾客戶:

  • 客戶必須在當月出生(例如,標記生日特價等)
  • 客戶必須完成少於五次的交易(例如,識別新客戶)

“當前月份”要求是通過自定義ItemProcessor實現的:

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

“交易數量有限”的要求被實現為ValidatingItemProcessor

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

然後將這對處理器封裝在實現委託模式的CompositeItemProcessor中:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

自定義作家

為了輸出數據,Spring Batch 提供接口org.springframework.batch.item.ItemWriter用於根據需要序列化對象:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

write()方法負責確保刷新任何內部緩衝區。 如果事務處於活動狀態,通常還需要在後續回滾時丟棄輸出。 寫入器向其發送數據的資源通常應該能夠自行處理。 有標準實現,例如CompositeItemWriterJdbcBatchItemWriterJmsItemWriterJpaItemWriterSimpleMailMessageItemWriter等。

在我們的示例應用程序中,過濾後的客戶列表如下所示:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

調度 Spring Batch 作業

默認情況下,Spring Batch 在啟動時執行它可以找到的所有作業(即在CustomerReportJobConfig中配置的作業)。 要更改此行為,請在啟動時通過將以下屬性添加到application.properties來禁用作業執行:

 spring.batch.job.enabled=false

然後通過將@EnableScheduling註釋添加到配置類並將@Scheduled註釋添加到執行作業本身的方法來實現實際調度。 可以使用延遲、速率或 cron 表達式來配置調度:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

上面的例子有一個問題。 在運行時,作業只會在第一次成功。 當它第二次啟動時(即 5 秒後),它將在日誌中生成以下消息(請注意,在 Spring Batch 的早期版本中會拋出JobInstanceAlreadyCompleteException ):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

發生這種情況是因為只能創建和執行唯一的JobInstance ,並且 Spring Batch 無法區分第一個和第二個JobInstance

安排批處理作業時,有兩種方法可以避免此問題。

一是確保為每個作業引入一個或多個唯一參數(例如,以納秒為單位的實際開始時間):

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

或者,您可以使用SimpleJobOperator.startNextInstance()附加到指定作業的JobInstance確定的JobParametersIncrementer序列中啟動下一個作業:

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Spring Batch 單元測試

通常,要在 Spring Boot 應用程序中運行單元測試,框架必須加載相應的ApplicationContext 。 為此使用了兩個註釋:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

有一個實用程序類org.springframework.batch.test.JobLauncherTestUtils來測試批處理作業。 它提供了啟動整個作業的方法,並允許對各個步驟進行端到端測試,而無需運行作業中的每個步驟。 它必須聲明為 Spring bean:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

作業和步驟的典型測試如下所示(也可以使用任何模擬框架):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch 為步驟和作業上下文引入了額外的範圍。 這些範圍內的對象使用 Spring 容器作為對象工廠,因此每個執行步驟或作業只有一個此類 bean 的實例。 此外,還支持從StepContextJobContext訪問的引用的後期綁定。 在運行時配置為步進或作業範圍的組件很難作為獨立組件進行測試,除非您有辦法將上下文設置為就好像它們在步驟或作業執行中一樣。 這就是 Spring Batch 中的org.springframework.batch.test.StepScopeTestExecutionListenerorg.springframework.batch.test.StepScopeTestUtils組件以及JobScopeTestExecutionListenerJobScopeTestUtils的目標。

TestExecutionListeners在類級別聲明,它的工作是為每個測試方法創建一個步驟執行上下文。 例如:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

有兩個TestExecutionListener 。 一種來自常規的 Spring Test 框架,並處理來自配置的應用程序上下文的依賴注入。 另一個是 Spring Batch StepScopeTestExecutionListener ,它為單元測試中的依賴注入設置步進範圍上下文。 StepContext在測試方法的持續時間內創建,並可供任何注入的依賴項使用。 默認行為只是創建具有固定屬性的StepExecution 。 或者,測試用例可以提供StepContext作為返回正確類型的工廠方法。

另一種方法是基於StepScopeTestUtils實用程序類。 此類用於在單元測試中以更靈活的方式創建和操作StepScope ,而無需使用依賴注入。 例如,讀取上面處理器過濾的客戶 ID 可以如下完成:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

準備好使用高級 Spring Batch 了嗎?

本文介紹了 Spring Batch 應用程序設計和開發的一些基礎知識。 但是,還有許多更高級的主題和功能——例如縮放、並行處理、偵聽器等——本文沒有涉及。 希望本文為入門提供了有用的基礎。

然後可以在 Spring Batch 的官方 Spring Back 文檔中找到有關這些更高級主題的信息。