Spring Batchチュートリアル:Springでバッチ処理が簡単に

公開: 2022-03-11

バッチ処理(バルク指向、非対話型、頻繁に長時間実行されるバックグラウンド実行に代表される)は、事実上すべての業界で広く使用されており、さまざまなタスクに適用されます。 バッチ処理は、データまたは計算集約型であり、順次または並行して実行され、アドホック、スケジュール、およびオンデマンドを含むさまざまな呼び出しモデルを介して開始される場合があります。

このSpringBatchチュートリアルでは、一般的なバッチアプリケーションのプログラミングモデルとドメイン言語について説明し、特に、現在のSpringBatch3.0.7バージョンを使用したバッチアプリケーションの設計と開発に対するいくつかの有用なアプローチを示します。

SpringBatchとは何ですか?

Spring Batchは、堅牢なバッチアプリケーションの開発を容易にするために設計された軽量で包括的なフレームワークです。 また、最適化とパーティション化の手法を通じて、非常に大量で高性能のバッチジョブをサポートする、より高度な技術サービスと機能を提供します。 Spring Batchは、経験豊富なすべてのSpring開発者に馴染みのあるSpringFrameworkのPOJOベースの開発アプローチに基づいて構築されています。

例として、この記事では、XML形式の顧客ファイルをロードし、さまざまな属性で顧客をフィルタリングし、フィルタリングされたエントリをテキストファイルに出力するサンプルプロジェクトのソースコードについて検討します。 Spring Batchの例(Lombokアノテーションを使用)のソースコードは、GitHubで入手でき、JavaSE8とMavenが必要です。

バッチ処理とは何ですか? 重要な概念と用語

バッチ開発者は、バッチ処理の主要な概念に精通し、慣れていることが重要です。 次の図は、多くの異なるプラットフォームでの数十年にわたる実装を通じて証明されたバッチ参照アーキテクチャの簡略版です。 SpringBatchで使用されるバッチ処理に関連する主要な概念と用語を紹介します。

Spring Batchチュートリアル:重要な概念と用語

バッチ処理の例に示されているように、バッチプロセスは通常、複数のStepで構成されるJobによってカプセル化されます。 各Stepには通常、単一のItemReaderItemProcessor 、およびItemWriterがあります。 JobJobLauncherによって実行され、構成および実行されたジョブに関するメタデータはJobRepositoryに格納されます。

Jobは複数のJobInstanceに関連付けることができ、各JobInstanceは、バッチジョブの開始に使用される特定のJobParametersによって一意に定義されます。 JobInstanceの各実行は、 JobInstanceと呼ばれJobExecution 。 各JobExecutionは通常、現在および終了ステータス、開始時間と終了時間など、実行中に発生したことを追跡します。

Stepは、バッチJobの独立した特定のフェーズであり、すべてのJobが1つ以上のStepで構成されます。 Jobと同様に、 Stepには、 Stepを実行する1回の試行を表す個別のStepExecutionがあります。 StepExecutionは、現在および終了ステータス、開始時刻と終了時刻などに関する情報、および対応するStepインスタンスとJobExecutionインスタンスへの参照を格納します。

ExecutionContextは、 StepExecutionまたはJobExecutionのいずれかにスコープされる情報を含むキーと値のペアのセットです。 Spring BatchはExecutionContextを永続化します。これは、バッチ実行を再開する場合(たとえば、致命的なエラーが発生した場合など)に役立ちます。 必要なのは、ステップ間で共有されるオブジェクトをコンテキストに配置することだけで、残りはフレームワークが処理します。 再起動後、前のExecutionContextの値がデータベースから復元され、適用されます。

JobRepositoryは、このすべての永続性を可能にするSpringBatchのメカニズムです。 JobLauncherJob 、およびStepのインスタンス化のためのCRUD操作を提供します。 Jobが起動されると、リポジトリからJobExecutionが取得され、実行中、 StepExecutionインスタンスとJobExecutionインスタンスがリポジトリに保持されます。

SpringBatchFramework入門

Spring Batchの利点の1つは、プロジェクトの依存関係が最小限に抑えられることです。これにより、すばやく起動して実行することが容易になります。 存在するいくつかの依存関係は、プロジェクトのpom.xmlで明確に指定および説明されており、ここからアクセスできます。

アプリケーションの実際の起動は、次のようなクラスで行われます。

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

@EnableBatchProcessingアノテーションは、Spring Batch機能を有効にし、バッチジョブを設定するための基本構成を提供します。

@SpringBootApplicationアノテーションは、スタンドアロンの本番環境に対応したSpringベースのアプリケーションを提供するSpringBootプロジェクトから取得されます。 これは、1つ以上のSpring Beanを宣言し、自動構成とSpringのコンポーネントスキャンをトリガーする構成クラスを指定します。

サンプルプロジェクトには、 JobBuilderFactoryStepBuilderFactoryが挿入されたCustomerReportJobConfigによって構成されたジョブが1つだけあります。 最小限のジョブ構成は、 CustomerReportJobConfigで次のように定義できます。

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

ステップを構築するには、主に2つのアプローチがあります。

上記の例に示されているように、1つのアプローチはタスクレットベースです。 タスクレットは、 execute()という1つのメソッドのみを持つ単純なインターフェイスをサポートします。このメソッドは、 TaskletRepeatStatus.FINISHEDか、失敗を通知するために例外をスローするまで繰り返し呼び出されます。 Taskletへの各呼び出しは、トランザクションにラップされます。

別のアプローチであるチャンク指向の処理では、データを順番に読み取り、トランザクション境界内で書き出される「チャンク」を作成します。 個々のアイテムは、 ItemProcessorから読み込まれ、 ItemReaderに渡され、集約されます。 読み取られたアイテムの数がコミット間隔と等しくなると、チャンク全体がItemWriterを介して書き出され、トランザクションがコミットされます。 チャンク指向のステップは、次のように構成できます。

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

chunk()メソッドは、指定されたサイズのチャンクでアイテムを処理するステップを構築し、各チャンクは指定されたリーダー、プロセッサー、およびライターに渡されます。 これらの方法については、この記事の次のセクションで詳しく説明します。

カスタムリーダー

Spring Batchサンプルアプリケーションの場合、XMLファイルから顧客のリストを読み取るには、インターフェースorg.springframework.batch.item.ItemReaderの実装を提供する必要があります。

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

ItemReaderはデータを提供し、ステートフルであることが期待されます。 これは通常、バッチごとに複数回呼び出され、 read()を呼び出すたびに次の値が返され、すべての入力データが使い果たされると最後にnullが返されます。

Spring Batchは、 ItemReaderのすぐに使用できる実装をいくつか提供します。これは、コレクション、ファイルの読み取り、JMSとJDBC、および複数のソースの統合など、さまざまな目的に使用できます。

サンプルアプリケーションでは、 CustomerItemReaderクラスが実際のread()呼び出しをIteratorItemReaderクラスの遅延初期化されたインスタンスに委任します。

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

この実装の@Componentは、@ Componentアノテーションと@StepScopeアノテーションを使用して作成され、このクラスがステップスコープのSpringコンポーネントであり、次のようにステップ実行ごとに1回作成されることをSpringに通知します。

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

カスタムプロセッサ

ItemProcessorsは、入力アイテムを変換し、アイテム指向の処理シナリオでビジネスロジックを導入します。 それらは、インターフェースorg.springframework.batch.item.ItemProcessorの実装を提供する必要があります。

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

メソッドprocess()は、 Iクラスの1つのインスタンスを受け入れ、同じタイプのインスタンスを返す場合と返さない場合があります。 nullを返す場合は、アイテムの処理を続行しないことを示します。 いつものように、Springは、注入されたItemProcessorのシーケンスを介してアイテムを渡すCompositeItemProcessorや、入力を検証するValidatingItemProcessorなど、いくつかの標準プロセッサを提供します。

サンプルアプリケーションの場合、プロセッサは次の要件によって顧客をフィルタリングするために使用されます。

  • 顧客は当月に生まれる必要があります(たとえば、誕生日スペシャルのフラグを立てるなど)。
  • 顧客は、完了したトランザクションが5つ未満である必要があります(たとえば、新しい顧客を識別するため)。

「当月」の要件は、カスタムItemProcessorを介して実装されます。

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

「トランザクションの制限数」要件は、 ValidatingItemProcessorとして実装されます。

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

次に、このプロセッサのペアは、デリゲートパターンを実装するCompositeItemProcessor内にカプセル化されます。

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

カスタムライター

データを出力するために、SpringBatchは必要に応じてオブジェクトをシリアル化するためのインターフェースorg.springframework.batch.item.ItemWriterを提供します。

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

write()メソッドは、内部バッファーがフラッシュされていることを確認する役割を果たします。 トランザクションがアクティブな場合、通常、後続のロールバックで出力を破棄する必要もあります。 ライターがデータを送信するリソースは、通常、これ自体を処理できる必要があります。 CompositeItemWriterJdbcBatchItemWriterJmsItemWriterJpaItemWriterSimpleMailMessageItemWriterなどの標準実装があります。

サンプルアプリケーションでは、フィルタリングされた顧客のリストは次のように書き出されます。

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

SpringBatchジョブのスケジューリング

デフォルトでは、Spring Batchは、起動時に検出できるすべてのジョブ(つまり、 CustomerReportJobConfigのように構成されているジョブ)を実行します。 この動作を変更するには、 application.propertiesに次のプロパティを追加して、起動時にジョブの実行を無効にします。

 spring.batch.job.enabled=false

次に、実際のスケジューリングは、 @EnableSchedulingアノテーションを構成クラスに追加し、@ @Scheduledアノテーションをジョブ自体を実行するメソッドに追加することによって実現されます。 スケジューリングは、遅延、レート、またはcron式で構成できます。

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

ただし、上記の例には問題があります。 実行時に、ジョブは最初にのみ成功します。 2回目(つまり5秒後)に起動すると、ログに次のメッセージが生成されます(以前のバージョンのSpring Batchでは、 JobInstanceAlreadyCompleteExceptionがスローされていたことに注意してください)。

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

これは、一意のJobInstanceのみを作成および実行でき、SpringBatchには最初と2番目のJobInstanceを区別する方法がないために発生します。

バッチジョブをスケジュールするときにこの問題を回避するには、2つの方法があります。

1つは、各ジョブに1つ以上の固有のパラメーター(たとえば、ナノ秒単位の実際の開始時間)を必ず導入することです。

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

または、 SimpleJobOperator.startNextInstance()を使用して、指定されたジョブにアタッチされたJobParametersIncrementerによって決定された一連のJobInstanceで次のジョブを起動できます。

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

SpringBatchユニットテスト

通常、Spring Bootアプリケーションで単体テストを実行するには、フレームワークが対応するApplicationContextをロードする必要があります。 この目的のために2つの注釈が使用されます。

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

バッチジョブをテストするためのユーティリティクラスorg.springframework.batch.test.JobLauncherTestUtilsがあります。 これは、ジョブ全体を起動するためのメソッドを提供するだけでなく、ジョブのすべてのステップを実行することなく、個々のステップのエンドツーエンドのテストを可能にします。 SpringBeanとして宣言する必要があります。

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

ジョブとステップの一般的なテストは次のようになります(また、任意のモックフレームワークを使用できます)。

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batchでは、ステップとジョブのコンテキストに追加のスコープが導入されています。 これらのスコープ内のオブジェクトは、Springコンテナをオブジェクトファクトリとして使用するため、実行ステップまたはジョブごとに、そのような各Beanのインスタンスは1つだけです。 さらに、 StepContextまたはJobContextからアクセス可能な参照の遅延バインディングのサポートが提供されます。 ステップスコープまたはジョブスコープになるように実行時に構成されたコンポーネントは、ステップまたはジョブの実行にあるかのようにコンテキストを設定する方法がない限り、スタンドアロンコンポーネントとしてテストするのは難しいです。 これが、Spring Batchのorg.springframework.batch.test.StepScopeTestExecutionListenerコンポーネントとorg.springframework.batch.test.StepScopeTestUtilsコンポーネント、およびJobScopeTestExecutionListenerJobScopeTestUtilsの目標です。

TestExecutionListenersはクラスレベルで宣言され、その役割は各テストメソッドのステップ実行コンテキストを作成することです。 例えば:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

TestExecutionListenerは2つあります。 1つは通常のSpringTestフレームワークからのものであり、構成されたアプリケーションコンテキストからの依存性注入を処理します。 もう1つは、単体テストへの依存性注入のためのステップスコープコンテキストを設定するStepScopeTestExecutionListenerです。 StepContextは、テストメソッドの期間中に作成され、注入されたすべての依存関係で使用できるようになります。 デフォルトの動作は、固定プロパティを使用してStepExecutionを作成することです。 または、 StepContextは、正しい型を返すファクトリメソッドとしてテストケースによって提供されます。

別のアプローチは、 StepScopeTestUtilsユーティリティクラスに基づいています。 このクラスは、依存性注入を使用せずに、より柔軟な方法で単体テストでStepScopeを作成および操作するために使用されます。 たとえば、上記のプロセッサによってフィルタリングされた顧客のIDの読み取りは、次のように実行できます。

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Advanced Spring Batchの準備はできていますか?

この記事では、SpringBatchアプリケーションの設計と開発の基本のいくつかを紹介します。 ただし、スケーリング、並列処理、リスナーなど、この記事では取り上げていない、より高度なトピックや機能が多数あります。 うまくいけば、この記事が始めるための有用な基盤を提供します。

これらのより高度なトピックに関する情報は、SpringBatchの公式SpringBackドキュメントに記載されています。