Spring Batchチュートリアル:Springでバッチ処理が簡単に
公開: 2022-03-11バッチ処理(バルク指向、非対話型、頻繁に長時間実行されるバックグラウンド実行に代表される)は、事実上すべての業界で広く使用されており、さまざまなタスクに適用されます。 バッチ処理は、データまたは計算集約型であり、順次または並行して実行され、アドホック、スケジュール、およびオンデマンドを含むさまざまな呼び出しモデルを介して開始される場合があります。
このSpringBatchチュートリアルでは、一般的なバッチアプリケーションのプログラミングモデルとドメイン言語について説明し、特に、現在のSpringBatch3.0.7バージョンを使用したバッチアプリケーションの設計と開発に対するいくつかの有用なアプローチを示します。
SpringBatchとは何ですか?
Spring Batchは、堅牢なバッチアプリケーションの開発を容易にするために設計された軽量で包括的なフレームワークです。 また、最適化とパーティション化の手法を通じて、非常に大量で高性能のバッチジョブをサポートする、より高度な技術サービスと機能を提供します。 Spring Batchは、経験豊富なすべてのSpring開発者に馴染みのあるSpringFrameworkのPOJOベースの開発アプローチに基づいて構築されています。
例として、この記事では、XML形式の顧客ファイルをロードし、さまざまな属性で顧客をフィルタリングし、フィルタリングされたエントリをテキストファイルに出力するサンプルプロジェクトのソースコードについて検討します。 Spring Batchの例(Lombokアノテーションを使用)のソースコードは、GitHubで入手でき、JavaSE8とMavenが必要です。
バッチ処理とは何ですか? 重要な概念と用語
バッチ開発者は、バッチ処理の主要な概念に精通し、慣れていることが重要です。 次の図は、多くの異なるプラットフォームでの数十年にわたる実装を通じて証明されたバッチ参照アーキテクチャの簡略版です。 SpringBatchで使用されるバッチ処理に関連する主要な概念と用語を紹介します。
バッチ処理の例に示されているように、バッチプロセスは通常、複数のStep
で構成されるJob
によってカプセル化されます。 各Step
には通常、単一のItemReader
、 ItemProcessor
、およびItemWriter
があります。 Job
はJobLauncher
によって実行され、構成および実行されたジョブに関するメタデータはJobRepository
に格納されます。
各Job
は複数のJobInstance
に関連付けることができ、各JobInstanceは、バッチジョブの開始に使用される特定のJobParameters
によって一意に定義されます。 JobInstanceの各実行は、 JobInstance
と呼ばれJobExecution
。 各JobExecution
は通常、現在および終了ステータス、開始時間と終了時間など、実行中に発生したことを追跡します。
Step
は、バッチJob
の独立した特定のフェーズであり、すべてのJob
が1つ以上のStep
で構成されます。 Job
と同様に、 Step
には、 Step
を実行する1回の試行を表す個別のStepExecution
があります。 StepExecution
は、現在および終了ステータス、開始時刻と終了時刻などに関する情報、および対応するStep
インスタンスとJobExecution
インスタンスへの参照を格納します。
ExecutionContext
は、 StepExecution
またはJobExecution
のいずれかにスコープされる情報を含むキーと値のペアのセットです。 Spring BatchはExecutionContext
を永続化します。これは、バッチ実行を再開する場合(たとえば、致命的なエラーが発生した場合など)に役立ちます。 必要なのは、ステップ間で共有されるオブジェクトをコンテキストに配置することだけで、残りはフレームワークが処理します。 再起動後、前のExecutionContext
の値がデータベースから復元され、適用されます。
JobRepository
は、このすべての永続性を可能にするSpringBatchのメカニズムです。 JobLauncher
、 Job
、およびStep
のインスタンス化のためのCRUD操作を提供します。 Job
が起動されると、リポジトリからJobExecution
が取得され、実行中、 StepExecution
インスタンスとJobExecution
インスタンスがリポジトリに保持されます。
SpringBatchFramework入門
Spring Batchの利点の1つは、プロジェクトの依存関係が最小限に抑えられることです。これにより、すばやく起動して実行することが容易になります。 存在するいくつかの依存関係は、プロジェクトのpom.xml
で明確に指定および説明されており、ここからアクセスできます。
アプリケーションの実際の起動は、次のようなクラスで行われます。
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }
@EnableBatchProcessing
アノテーションは、Spring Batch機能を有効にし、バッチジョブを設定するための基本構成を提供します。
@SpringBootApplication
アノテーションは、スタンドアロンの本番環境に対応したSpringベースのアプリケーションを提供するSpringBootプロジェクトから取得されます。 これは、1つ以上のSpring Beanを宣言し、自動構成とSpringのコンポーネントスキャンをトリガーする構成クラスを指定します。
サンプルプロジェクトには、 JobBuilderFactory
とStepBuilderFactory
が挿入されたCustomerReportJobConfig
によって構成されたジョブが1つだけあります。 最小限のジョブ構成は、 CustomerReportJobConfig
で次のように定義できます。
@Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }
ステップを構築するには、主に2つのアプローチがあります。
上記の例に示されているように、1つのアプローチはタスクレットベースです。 タスクレットは、 execute()
という1つのメソッドのみを持つ単純なインターフェイスをサポートします。このメソッドは、 Tasklet
をRepeatStatus.FINISHED
か、失敗を通知するために例外をスローするまで繰り返し呼び出されます。 Tasklet
への各呼び出しは、トランザクションにラップされます。
別のアプローチであるチャンク指向の処理では、データを順番に読み取り、トランザクション境界内で書き出される「チャンク」を作成します。 個々のアイテムは、 ItemProcessor
から読み込まれ、 ItemReader
に渡され、集約されます。 読み取られたアイテムの数がコミット間隔と等しくなると、チャンク全体がItemWriter
を介して書き出され、トランザクションがコミットされます。 チャンク指向のステップは、次のように構成できます。
@Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
chunk()
メソッドは、指定されたサイズのチャンクでアイテムを処理するステップを構築し、各チャンクは指定されたリーダー、プロセッサー、およびライターに渡されます。 これらの方法については、この記事の次のセクションで詳しく説明します。
カスタムリーダー
Spring Batchサンプルアプリケーションの場合、XMLファイルから顧客のリストを読み取るには、インターフェースorg.springframework.batch.item.ItemReader
の実装を提供する必要があります。
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
ItemReader
はデータを提供し、ステートフルであることが期待されます。 これは通常、バッチごとに複数回呼び出され、 read()
を呼び出すたびに次の値が返され、すべての入力データが使い果たされると最後にnull
が返されます。
Spring Batchは、 ItemReader
のすぐに使用できる実装をいくつか提供します。これは、コレクション、ファイルの読み取り、JMSとJDBC、および複数のソースの統合など、さまざまな目的に使用できます。
サンプルアプリケーションでは、 CustomerItemReader
クラスが実際のread()
呼び出しをIteratorItemReader
クラスの遅延初期化されたインスタンスに委任します。
public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }
この実装の@Component
は、@ Componentアノテーションと@StepScope
アノテーションを使用して作成され、このクラスがステップスコープのSpringコンポーネントであり、次のようにステップ実行ごとに1回作成されることをSpringに通知します。
@StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }
カスタムプロセッサ
ItemProcessors
は、入力アイテムを変換し、アイテム指向の処理シナリオでビジネスロジックを導入します。 それらは、インターフェースorg.springframework.batch.item.ItemProcessor
の実装を提供する必要があります。
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
メソッドprocess()
は、 I
クラスの1つのインスタンスを受け入れ、同じタイプのインスタンスを返す場合と返さない場合があります。 null
を返す場合は、アイテムの処理を続行しないことを示します。 いつものように、Springは、注入されたItemProcessor
のシーケンスを介してアイテムを渡すCompositeItemProcessor
や、入力を検証するValidatingItemProcessor
など、いくつかの標準プロセッサを提供します。
サンプルアプリケーションの場合、プロセッサは次の要件によって顧客をフィルタリングするために使用されます。
- 顧客は当月に生まれる必要があります(たとえば、誕生日スペシャルのフラグを立てるなど)。
- 顧客は、完了したトランザクションが5つ未満である必要があります(たとえば、新しい顧客を識別するため)。
「当月」の要件は、カスタムItemProcessor
を介して実装されます。
public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }
「トランザクションの制限数」要件は、 ValidatingItemProcessor
として実装されます。
public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }
次に、このプロセッサのペアは、デリゲートパターンを実装するCompositeItemProcessor
内にカプセル化されます。

@StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }
カスタムライター
データを出力するために、SpringBatchは必要に応じてオブジェクトをシリアル化するためのインターフェースorg.springframework.batch.item.ItemWriter
を提供します。
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }
write()
メソッドは、内部バッファーがフラッシュされていることを確認する役割を果たします。 トランザクションがアクティブな場合、通常、後続のロールバックで出力を破棄する必要もあります。 ライターがデータを送信するリソースは、通常、これ自体を処理できる必要があります。 CompositeItemWriter
、 JdbcBatchItemWriter
、 JmsItemWriter
、 JpaItemWriter
、 SimpleMailMessageItemWriter
などの標準実装があります。
サンプルアプリケーションでは、フィルタリングされた顧客のリストは次のように書き出されます。
public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }
SpringBatchジョブのスケジューリング
デフォルトでは、Spring Batchは、起動時に検出できるすべてのジョブ(つまり、 CustomerReportJobConfig
のように構成されているジョブ)を実行します。 この動作を変更するには、 application.properties
に次のプロパティを追加して、起動時にジョブの実行を無効にします。
spring.batch.job.enabled=false
次に、実際のスケジューリングは、 @EnableScheduling
アノテーションを構成クラスに追加し、@ @Scheduled
アノテーションをジョブ自体を実行するメソッドに追加することによって実現されます。 スケジューリングは、遅延、レート、またはcron式で構成できます。
// run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }
ただし、上記の例には問題があります。 実行時に、ジョブは最初にのみ成功します。 2回目(つまり5秒後)に起動すると、ログに次のメッセージが生成されます(以前のバージョンのSpring Batchでは、 JobInstanceAlreadyCompleteException
がスローされていたことに注意してください)。
INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=
これは、一意のJobInstance
のみを作成および実行でき、SpringBatchには最初と2番目のJobInstance
を区別する方法がないために発生します。
バッチジョブをスケジュールするときにこの問題を回避するには、2つの方法があります。
1つは、各ジョブに1つ以上の固有のパラメーター(たとえば、ナノ秒単位の実際の開始時間)を必ず導入することです。
@Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }
または、 SimpleJobOperator.startNextInstance()
を使用して、指定されたジョブにアタッチされたJobParametersIncrementer
によって決定された一連のJobInstance
で次のジョブを起動できます。
@Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }
SpringBatchユニットテスト
通常、Spring Bootアプリケーションで単体テストを実行するには、フレームワークが対応するApplicationContext
をロードする必要があります。 この目的のために2つの注釈が使用されます。
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})
バッチジョブをテストするためのユーティリティクラスorg.springframework.batch.test.JobLauncherTestUtils
があります。 これは、ジョブ全体を起動するためのメソッドを提供するだけでなく、ジョブのすべてのステップを実行することなく、個々のステップのエンドツーエンドのテストを可能にします。 SpringBeanとして宣言する必要があります。
@Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }
ジョブとステップの一般的なテストは次のようになります(また、任意のモックフレームワークを使用できます)。
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }
Spring Batchでは、ステップとジョブのコンテキストに追加のスコープが導入されています。 これらのスコープ内のオブジェクトは、Springコンテナをオブジェクトファクトリとして使用するため、実行ステップまたはジョブごとに、そのような各Beanのインスタンスは1つだけです。 さらに、 StepContext
またはJobContext
からアクセス可能な参照の遅延バインディングのサポートが提供されます。 ステップスコープまたはジョブスコープになるように実行時に構成されたコンポーネントは、ステップまたはジョブの実行にあるかのようにコンテキストを設定する方法がない限り、スタンドアロンコンポーネントとしてテストするのは難しいです。 これが、Spring Batchのorg.springframework.batch.test.StepScopeTestExecutionListener
コンポーネントとorg.springframework.batch.test.StepScopeTestUtils
コンポーネント、およびJobScopeTestExecutionListener
とJobScopeTestUtils
の目標です。
TestExecutionListeners
はクラスレベルで宣言され、その役割は各テストメソッドのステップ実行コンテキストを作成することです。 例えば:
@RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }
TestExecutionListener
は2つあります。 1つは通常のSpringTestフレームワークからのものであり、構成されたアプリケーションコンテキストからの依存性注入を処理します。 もう1つは、単体テストへの依存性注入のためのステップスコープコンテキストを設定するStepScopeTestExecutionListener
です。 StepContext
は、テストメソッドの期間中に作成され、注入されたすべての依存関係で使用できるようになります。 デフォルトの動作は、固定プロパティを使用してStepExecution
を作成することです。 または、 StepContext
は、正しい型を返すファクトリメソッドとしてテストケースによって提供されます。
別のアプローチは、 StepScopeTestUtils
ユーティリティクラスに基づいています。 このクラスは、依存性注入を使用せずに、より柔軟な方法で単体テストでStepScope
を作成および操作するために使用されます。 たとえば、上記のプロセッサによってフィルタリングされた顧客のIDの読み取りは、次のように実行できます。
@Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }
Advanced Spring Batchの準備はできていますか?
この記事では、SpringBatchアプリケーションの設計と開発の基本のいくつかを紹介します。 ただし、スケーリング、並列処理、リスナーなど、この記事では取り上げていない、より高度なトピックや機能が多数あります。 うまくいけば、この記事が始めるための有用な基盤を提供します。
これらのより高度なトピックに関する情報は、SpringBatchの公式SpringBackドキュメントに記載されています。