Tutorial Batch Musim Semi: Pemrosesan Batch Menjadi Mudah dengan Spring

Diterbitkan: 2022-03-11

Pemrosesan batch—ditandai dengan eksekusi latar belakang berorientasi massal, non-interaktif, dan sering berjalan lama—banyak digunakan di hampir setiap industri dan diterapkan pada beragam tugas. Pemrosesan batch dapat berupa data atau komputasi intensif, dijalankan secara berurutan atau paralel, dan dapat dimulai melalui berbagai model pemanggilan, termasuk ad hoc, terjadwal, dan sesuai permintaan.

Tutorial Spring Batch ini menjelaskan model pemrograman dan bahasa domain aplikasi batch secara umum dan, khususnya, menunjukkan beberapa pendekatan yang berguna untuk desain dan pengembangan aplikasi batch menggunakan versi Spring Batch 3.0.7 saat ini.

Apa itu Musim Semi Batch?

Spring Batch adalah kerangka kerja yang ringan dan komprehensif yang dirancang untuk memfasilitasi pengembangan aplikasi batch yang kuat. Ini juga menyediakan layanan dan fitur teknis yang lebih canggih yang mendukung pekerjaan batch dengan volume sangat tinggi dan kinerja tinggi melalui teknik optimasi dan partisi. Spring Batch dibangun di atas pendekatan pengembangan berbasis POJO dari Spring Framework, yang akrab bagi semua pengembang Spring yang berpengalaman.

Sebagai contoh, artikel ini mempertimbangkan kode sumber dari proyek sampel yang memuat file pelanggan berformat XML, memfilter pelanggan menurut berbagai atribut, dan mengeluarkan entri yang difilter ke file teks. Kode sumber untuk contoh Spring Batch kami (yang menggunakan anotasi Lombok) tersedia di sini di GitHub dan memerlukan Java SE 8 dan Maven.

Apa itu Pemrosesan Batch? Konsep Kunci dan Terminologi

Penting bagi setiap pengembang batch untuk terbiasa dan nyaman dengan konsep utama pemrosesan batch. Diagram di bawah ini adalah versi sederhana dari arsitektur referensi batch yang telah terbukti selama beberapa dekade implementasi di banyak platform yang berbeda. Ini memperkenalkan konsep dan istilah kunci yang relevan dengan pemrosesan batch, seperti yang digunakan oleh Spring Batch.

Tutorial Batch Musim Semi: Konsep dan Terminologi Utama

Seperti yang ditunjukkan dalam contoh pemrosesan batch kami, proses batch biasanya dienkapsulasi oleh Job yang terdiri dari beberapa Step s. Setiap Step biasanya memiliki satu ItemReader , ItemProcessor , dan ItemWriter . Job dijalankan oleh JobLauncher , dan metadata tentang pekerjaan yang dikonfigurasi dan dijalankan disimpan dalam JobRepository .

Setiap Job dapat dikaitkan dengan beberapa JobInstance s, yang masing-masing didefinisikan secara unik oleh JobParameters tertentu yang digunakan untuk memulai pekerjaan batch. Setiap menjalankan JobInstance disebut sebagai JobExecution . Setiap JobExecution biasanya melacak apa yang terjadi selama proses, seperti status saat ini dan keluar, waktu mulai dan berakhir, dll.

A Step adalah fase independen dan spesifik dari batch Job , sehingga setiap Job terdiri dari satu atau lebih Step s. Mirip dengan Job , Step memiliki StepExecution individual yang mewakili satu upaya untuk mengeksekusi Step . StepExecution menyimpan informasi tentang status saat ini dan status keluar, waktu mulai dan berakhir, dan seterusnya, serta referensi ke instance Step dan JobExecution .

ExecutionContext adalah kumpulan pasangan nilai kunci yang berisi informasi yang dicakupkan ke StepExecution atau JobExecution . Spring Batch mempertahankan ExecutionContext , yang membantu dalam kasus di mana Anda ingin memulai ulang batch run (misalnya, ketika kesalahan fatal telah terjadi, dll.). Yang diperlukan hanyalah menempatkan objek apa pun untuk dibagikan di antara langkah-langkah ke dalam konteks dan kerangka kerja akan mengurus sisanya. Setelah restart, nilai dari ExecutionContext sebelumnya dipulihkan dari database dan diterapkan.

JobRepository adalah mekanisme di Spring Batch yang memungkinkan semua kegigihan ini. Ini menyediakan operasi CRUD untuk JobLauncher , Job , dan Step . Setelah Job diluncurkan, JobExecution diperoleh dari repositori dan, selama eksekusi, StepExecution dan JobExecution disimpan ke repositori.

Memulai dengan Kerangka Batch Musim Semi

Salah satu keuntungan dari Spring Batch adalah ketergantungan proyek minimal, yang membuatnya lebih mudah untuk bangun dan berjalan dengan cepat. Beberapa dependensi yang ada secara jelas ditentukan dan dijelaskan dalam pom.xml proyek, yang dapat diakses di sini.

Startup aplikasi yang sebenarnya terjadi di kelas yang terlihat seperti berikut:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

Anotasi @EnableBatchProcessing mengaktifkan fitur Spring Batch dan menyediakan konfigurasi dasar untuk menyiapkan tugas batch.

Anotasi @SpringBootApplication berasal dari proyek Spring Boot yang menyediakan aplikasi mandiri, siap produksi, berbasis Spring. Ini menentukan kelas konfigurasi yang mendeklarasikan satu atau lebih kacang Spring dan juga memicu konfigurasi otomatis dan pemindaian komponen Spring.

Proyek sampel kami hanya memiliki satu pekerjaan yang dikonfigurasi oleh CustomerReportJobConfig dengan JobBuilderFactory dan StepBuilderFactory yang disuntikkan. Konfigurasi pekerjaan minimal dapat ditentukan di CustomerReportJobConfig sebagai berikut:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Ada dua pendekatan utama untuk membangun sebuah langkah.

Salah satu pendekatan, seperti yang ditunjukkan pada contoh di atas, adalah berbasis tugas . Tasklet mendukung antarmuka sederhana yang hanya memiliki satu metode, execute() , yang dipanggil berulang kali hingga mengembalikan RepeatStatus.FINISHED atau melempar pengecualian untuk menandakan kegagalan. Setiap panggilan ke Tasklet dibungkus dalam transaksi.

Pendekatan lain, pemrosesan berorientasi chunk , mengacu pada membaca data secara berurutan dan membuat "potongan" yang akan ditulis dalam batas transaksi. Setiap item individual dibaca dari ItemReader , diserahkan ke ItemProcessor , dan dikumpulkan. Setelah jumlah item yang dibaca sama dengan interval komit, seluruh potongan ditulis melalui ItemWriter , dan kemudian transaksi dilakukan. Langkah berorientasi chunk dapat dikonfigurasi sebagai berikut:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

Metode chunk() membangun langkah yang memproses item dalam potongan dengan ukuran yang disediakan, dengan setiap potongan kemudian diteruskan ke pembaca, prosesor, dan penulis yang ditentukan. Metode ini dibahas secara lebih rinci di bagian selanjutnya dari artikel ini.

Pembaca Kustom

Untuk aplikasi sampel Spring Batch kami, untuk membaca daftar pelanggan dari file XML, kami perlu menyediakan implementasi antarmuka org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

ItemReader menyediakan data dan diharapkan menjadi stateful. Biasanya dipanggil beberapa kali untuk setiap batch, dengan setiap panggilan ke read() mengembalikan nilai berikutnya dan akhirnya mengembalikan null ketika semua data input telah habis.

Spring Batch menyediakan beberapa implementasi ItemReader yang out-of-the-box, yang dapat digunakan untuk berbagai tujuan seperti membaca koleksi, file, mengintegrasikan JMS dan JDBC serta berbagai sumber, dan sebagainya.

Dalam contoh aplikasi kami, kelas CustomerItemReader mendelegasikan panggilan read() aktual ke instance kelas IteratorItemReader yang diinisialisasi dengan malas:

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Kacang Spring untuk implementasi ini dibuat dengan anotasi @Component dan @StepScope , memberi tahu Spring bahwa kelas ini adalah komponen Spring dengan cakupan langkah dan akan dibuat satu kali per langkah eksekusi sebagai berikut:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Prosesor Khusus

ItemProcessors mengubah item input dan memperkenalkan logika bisnis dalam skenario pemrosesan berorientasi item. Mereka harus menyediakan implementasi antarmuka org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

Metode process() menerima satu instance dari kelas I dan mungkin atau mungkin tidak mengembalikan instance dari tipe yang sama. Mengembalikan null menunjukkan bahwa item tidak boleh terus diproses. Seperti biasa, Spring menyediakan beberapa prosesor standar, seperti CompositeItemProcessor yang melewati item melalui urutan ItemProcessor yang disuntikkan dan ValidatingItemProcessor yang memvalidasi input.

Dalam kasus aplikasi sampel kami, pemroses digunakan untuk menyaring pelanggan dengan persyaratan berikut:

  • Pelanggan harus lahir di bulan berjalan (misalnya, untuk menandai spesial ulang tahun, dll.)
  • Pelanggan harus menyelesaikan kurang dari lima transaksi (misalnya, untuk mengidentifikasi pelanggan baru)

Persyaratan "bulan berjalan" diimplementasikan melalui ItemProcessor kustom :

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

Persyaratan "jumlah transaksi terbatas" diimplementasikan sebagai ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Sepasang prosesor ini kemudian dienkapsulasi dalam CompositeItemProcessor yang mengimplementasikan pola delegasi:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Penulis Kustom

Untuk mengeluarkan data, Spring Batch menyediakan antarmuka org.springframework.batch.item.ItemWriter untuk membuat serial objek yang diperlukan:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

Metode write() bertanggung jawab untuk memastikan bahwa semua buffer internal di-flush. Jika transaksi aktif, biasanya juga perlu membuang output pada rollback berikutnya. Sumber daya tempat penulis mengirimkan data biasanya dapat menangani ini sendiri. Ada implementasi standar seperti CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter , dan lain-lain.

Dalam aplikasi sampel kami, daftar pelanggan yang disaring ditulis sebagai berikut:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Menjadwalkan Pekerjaan Batch Musim Semi

Secara default, Spring Batch mengeksekusi semua pekerjaan yang dapat ditemukan (yaitu, yang dikonfigurasi seperti di CustomerReportJobConfig ) saat startup. Untuk mengubah perilaku ini, nonaktifkan eksekusi pekerjaan saat startup dengan menambahkan properti berikut ke application.properties :

 spring.batch.job.enabled=false

Penjadwalan sebenarnya kemudian dicapai dengan menambahkan anotasi @EnableScheduling ke kelas konfigurasi dan anotasi @Scheduled ke metode yang menjalankan pekerjaan itu sendiri. Penjadwalan dapat dikonfigurasi dengan penundaan, kecepatan, atau ekspresi cron:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Namun ada masalah dengan contoh di atas. Pada saat dijalankan, pekerjaan akan berhasil pertama kali saja. Ketika diluncurkan untuk kedua kalinya (yaitu setelah lima detik), itu akan menghasilkan pesan berikut di log (perhatikan bahwa di versi Spring Batch sebelumnya, JobInstanceAlreadyCompleteException akan dilemparkan):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Ini terjadi karena hanya JobInstance unik yang dapat dibuat dan dijalankan dan Spring Batch tidak memiliki cara untuk membedakan antara JobInstance pertama dan kedua.

Ada dua cara untuk menghindari masalah ini saat Anda menjadwalkan pekerjaan batch.

Salah satunya adalah memastikan untuk memperkenalkan satu atau lebih parameter unik (misalnya, waktu mulai aktual dalam nanodetik) untuk setiap pekerjaan:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Atau, Anda dapat meluncurkan tugas berikutnya dalam urutan JobInstance yang ditentukan oleh JobParametersIncrementer yang dilampirkan ke tugas yang ditentukan dengan SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Pengujian Unit Batch Musim Semi

Biasanya, untuk menjalankan pengujian unit dalam aplikasi Boot Musim Semi, kerangka kerja harus memuat ApplicationContext yang sesuai. Dua anotasi digunakan untuk tujuan ini:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Ada kelas utilitas org.springframework.batch.test.JobLauncherTestUtils untuk menguji pekerjaan batch. Ini menyediakan metode untuk meluncurkan seluruh pekerjaan serta memungkinkan pengujian ujung ke ujung dari setiap langkah tanpa harus menjalankan setiap langkah dalam pekerjaan. Itu harus dinyatakan sebagai kacang Spring:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Tes khas untuk pekerjaan dan langkah terlihat sebagai berikut (dan juga dapat menggunakan kerangka kerja tiruan apa pun):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch memperkenalkan cakupan tambahan untuk konteks langkah dan pekerjaan. Objek dalam cakupan ini menggunakan wadah Spring sebagai pabrik objek, jadi hanya ada satu instance dari setiap kacang tersebut per langkah atau tugas eksekusi. Selain itu, dukungan disediakan untuk pengikatan referensi yang terlambat yang dapat diakses dari StepContext atau JobContext . Komponen yang dikonfigurasi saat runtime menjadi langkah-atau lingkup pekerjaan sulit untuk diuji sebagai komponen mandiri kecuali Anda memiliki cara untuk mengatur konteks seolah-olah mereka berada dalam langkah atau eksekusi pekerjaan. Itulah tujuan komponen JobScopeTestUtils dan org.springframework.batch.test.StepScopeTestUtils di Spring Batch, serta org.springframework.batch.test.StepScopeTestExecutionListener dan JobScopeTestExecutionListener .

TestExecutionListeners dideklarasikan pada tingkat kelas, dan tugasnya adalah membuat konteks eksekusi langkah untuk setiap metode pengujian. Sebagai contoh:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Ada dua TestExecutionListener s. Salah satunya adalah dari kerangka kerja Tes Musim Semi reguler dan menangani injeksi ketergantungan dari konteks aplikasi yang dikonfigurasi. Yang lainnya adalah Spring Batch StepScopeTestExecutionListener yang menyiapkan konteks lingkup langkah untuk injeksi ketergantungan ke dalam pengujian unit. StepContext dibuat selama metode pengujian dan tersedia untuk semua dependensi yang disuntikkan. Perilaku default hanya untuk membuat StepExecution dengan properti tetap. Atau, StepContext dapat disediakan oleh test case sebagai metode pabrik yang mengembalikan tipe yang benar.

Pendekatan lain didasarkan pada kelas utilitas StepScopeTestUtils . Kelas ini digunakan untuk membuat dan memanipulasi StepScope dalam pengujian unit dengan cara yang lebih fleksibel tanpa menggunakan injeksi ketergantungan. Misalnya, membaca ID pelanggan yang difilter oleh prosesor di atas dapat dilakukan sebagai berikut:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Siap untuk Batch Musim Semi Tingkat Lanjut?

Artikel ini memperkenalkan beberapa dasar desain dan pengembangan aplikasi Spring Batch. Namun, ada banyak topik dan kemampuan tingkat lanjut—seperti penskalaan, pemrosesan paralel, pendengar, dan lainnya—yang tidak dibahas dalam artikel ini. Semoga artikel ini memberikan landasan yang berguna untuk memulai.

Informasi tentang topik yang lebih maju ini kemudian dapat ditemukan di dokumentasi Spring Back resmi untuk Spring Batch.