Spring Batch Eğitimi: Yay ile Toplu İşleme Kolaylaştı

Yayınlanan: 2022-03-11

Toplu işleme (yığın odaklı, etkileşimli olmayan ve sıklıkla uzun süreli arka planda yürütme ile tanımlanan) hemen hemen her sektörde yaygın olarak kullanılır ve çok çeşitli görevlere uygulanır. Toplu işleme, veri veya hesaplama açısından yoğun olabilir, sıralı veya paralel olarak yürütülebilir ve geçici, planlanmış ve isteğe bağlı dahil olmak üzere çeşitli çağırma modelleri aracılığıyla başlatılabilir.

Bu Spring Batch öğreticisi, genel olarak toplu uygulamaların programlama modelini ve etki alanı dilini açıklar ve özellikle, mevcut Spring Batch 3.0.7 sürümünü kullanan toplu uygulamaların tasarımı ve geliştirilmesine yönelik bazı yararlı yaklaşımları gösterir.

Bahar Grubu nedir?

Spring Batch, sağlam toplu uygulamaların geliştirilmesini kolaylaştırmak için tasarlanmış hafif, kapsamlı bir çerçevedir. Ayrıca, optimizasyon ve bölümleme teknikleri ile son derece yüksek hacimli ve yüksek performanslı toplu işleri destekleyen daha gelişmiş teknik hizmetler ve özellikler sağlar. Spring Batch, tüm deneyimli Spring geliştiricilerinin aşina olduğu, Spring Framework'ün POJO tabanlı geliştirme yaklaşımı üzerine kuruludur.

Örnek olarak, bu makale, XML biçimli bir müşteri dosyası yükleyen, müşterileri çeşitli özniteliklere göre filtreleyen ve filtrelenmiş girdileri bir metin dosyasına çıkaran örnek bir projeden kaynak kodunu ele almaktadır. Spring Batch örneğimizin (Lombok ek açıklamalarını kullanan) kaynak kodu burada GitHub'da mevcuttur ve Java SE 8 ve Maven gerektirir.

Toplu İşleme Nedir? Anahtar Kavramlar ve Terminoloji

Herhangi bir toplu iş geliştiricisinin toplu işlemenin ana kavramlarına aşina ve rahat olması önemlidir. Aşağıdaki şema, birçok farklı platformda onlarca yıllık uygulamalarla kanıtlanmış toplu referans mimarisinin basitleştirilmiş bir versiyonudur. Spring Batch tarafından kullanıldığı şekliyle toplu işlemeyle ilgili temel kavramları ve terimleri tanıtır.

Spring Batch Eğitimi: Temel Kavramlar ve Terminoloji

Toplu işleme örneğimizde gösterildiği gibi, bir toplu işlem tipik olarak birden çok Step oluşan bir Job tarafından kapsüllenir. Her Step genellikle tek bir ItemReader , ItemProcessor ve ItemWriter . Bir Job , bir JobLauncher tarafından yürütülür ve yapılandırılan ve yürütülen işlerle ilgili meta veriler bir JobRepository depolanır.

Her Job , her biri bir toplu işi başlatmak için kullanılan belirli JobParameters tarafından benzersiz olarak tanımlanan birden çok JobInstance s ile ilişkilendirilebilir. Bir JobExecution her çalıştırması, JobInstance olarak adlandırılır. Her JobExecution tipik olarak mevcut ve çıkış durumları, başlangıç ​​ve bitiş saatleri vb. gibi bir çalıştırma sırasında olanları izler.

Step , bir toplu Job bağımsız, belirli bir aşamasıdır, öyle ki her Job bir veya daha fazla Step oluşur. Bir Job benzer şekilde, bir Step Step için tek bir girişimi temsil eden ayrı bir StepExecution vardır. StepExecution , geçerli ve çıkış durumları, başlangıç ​​ve bitiş zamanları vb. ile ilgili bilgileri ve ilgili Step ve JobExecution örneklerine referansları saklar.

ExecutionContext , StepExecution veya JobExecution kapsamına giren bilgileri içeren bir anahtar/değer çiftleri kümesidir. Spring Batch, bir toplu çalıştırmayı yeniden başlatmak istediğiniz durumlarda (örneğin, önemli bir hata oluştuğunda, vb.) yardımcı olan ExecutionContext öğesini sürdürür. Gerekli olan tek şey, adımlar arasında paylaşılacak herhangi bir nesneyi bağlama koymaktır ve gerisini çerçeve halledecektir. Yeniden başlattıktan sonra, önceki ExecutionContext değerler veritabanından geri yüklenir ve uygulanır.

JobRepository , Spring Batch'teki tüm bu kalıcılığı mümkün kılan mekanizmadır. JobLauncher , Job ve Step örneklemeleri için CRUD işlemleri sağlar. Bir Job başlatıldığında, depodan bir JobExecution alınır ve yürütme sırasında StepExecution ve JobExecution örnekleri depoda kalıcı olur.

Spring Batch Framework'e Başlarken

Spring Batch'in avantajlarından biri, proje bağımlılıklarının minimum düzeyde olması ve bu da hızlı bir şekilde çalışmaya başlamayı kolaylaştırmasıdır. Var olan birkaç bağımlılık, projenin buradan erişilebilen pom.xml açıkça belirtilmiş ve açıklanmıştır.

Uygulamanın asıl başlatılması, aşağıdakine benzer bir sınıfta gerçekleşir:

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

@EnableBatchProcessing ek açıklaması, Spring Batch özelliklerini etkinleştirir ve toplu işleri ayarlamak için temel bir yapılandırma sağlar.

@SpringBootApplication ek açıklaması, bağımsız, üretime hazır, Spring tabanlı uygulamalar sağlayan Spring Boot projesinden gelir. Bir veya daha fazla Spring bean bildiren ve ayrıca otomatik konfigürasyonu ve Spring'in bileşen taramasını tetikleyen bir konfigürasyon sınıfını belirtir.

Örnek projemiz, CustomerReportJobConfig tarafından enjekte edilmiş bir JobBuilderFactory ve StepBuilderFactory ile yapılandırılan yalnızca bir işe sahiptir. Minimum iş yapılandırması, CustomerReportJobConfig şu şekilde tanımlanabilir:

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Bir adım oluşturmak için iki ana yaklaşım vardır.

Yukarıdaki örnekte gösterildiği gibi bir yaklaşım, görev uygulamasına dayalıdır . Bir Tasklet , RepeatStatus.FINISHED döndürene veya bir hata sinyali vermek için bir istisna oluşturana kadar tekrar tekrar çağrılan execute() adlı tek bir yöntemi olan basit bir arabirimi destekler. Tasklet yapılan her çağrı bir işleme sarılır.

Diğer bir yaklaşım, yığın yönelimli işleme , verileri sırayla okumak ve bir işlem sınırı içinde yazılacak "parçalar" oluşturmak anlamına gelir. Her bir öğe bir ItemReader'dan okunur, bir ItemReader ItemProcessor ve toplanır. Okunan öğe sayısı kesinleştirme aralığına eşit olduğunda, tüm yığın ItemWriter aracılığıyla yazılır ve ardından işlem tamamlanır. Yığın odaklı bir adım aşağıdaki gibi yapılandırılabilir:

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

chunk() yöntemi, her bir parçanın belirtilen okuyucuya, işlemciye ve yazıcıya iletildiği, sağlanan boyutta parçalar halinde öğeleri işleyen bir adım oluşturur. Bu yöntemler, bu makalenin sonraki bölümlerinde daha ayrıntılı olarak tartışılmaktadır.

Özel Okuyucu

Spring Batch örnek uygulamamız için, bir XML dosyasından müşteri listesini okumak için, org.springframework.batch.item.ItemReader arabiriminin bir uygulamasını sağlamamız gerekiyor:

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

Bir ItemReader , verileri sağlar ve durum bilgisi olması beklenir. Genellikle her toplu iş için birden çok kez çağrılır, her read() çağrısı bir sonraki değeri döndürür ve son olarak tüm girdi verileri tükendiğinde null değerini döndürür.

Spring Batch, koleksiyonları, dosyaları okumak, JMS ve JDBC'yi ve aynı zamanda birden çok kaynağı entegre etmek gibi çeşitli amaçlar için kullanılabilen bazı ItemReader uygulamalarını sağlar.

Örnek uygulamamızda, CustomerItemReader sınıfı, gerçek read() çağrılarını IteratorItemReader sınıfının tembelce başlatılan bir örneğine devreder:

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Bu uygulama için bir Spring bean, @Component ve @StepScope ek açıklamalarıyla oluşturulur ve Spring'in bu sınıfın adım kapsamlı bir Spring bileşeni olduğunu ve aşağıdaki gibi adım yürütme başına bir kez oluşturulacağını bilmesini sağlar:

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Özel İşlemciler

ItemProcessors , girdi öğelerini dönüştürür ve iş mantığını öğeye dayalı bir işleme senaryosunda sunar. org.springframework.batch.item.ItemProcessor arabiriminin bir uygulamasını sağlamaları gerekir:

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

process() yöntemi, I sınıfının bir örneğini kabul eder ve aynı türden bir örnek döndürebilir veya döndürmeyebilir. null döndürmek, öğenin işlenmeye devam etmemesi gerektiğini gösterir. Her zamanki gibi Spring, öğeyi enjekte edilen bir ItemProcessor s dizisi ve girişi doğrulayan bir ValidatingItemProcessor içinden geçiren CompositeItemProcessor gibi birkaç standart işlemci sağlar.

Örnek uygulamamız söz konusu olduğunda, müşterileri aşağıdaki gereksinimlere göre filtrelemek için işlemciler kullanılır:

  • Bir müşteri, içinde bulunulan ayda doğmuş olmalıdır (örneğin, doğum günü spesiyallerini işaretlemek vb.)
  • Bir müşterinin beşten az tamamlanmış işlemi olmalıdır (örneğin, daha yeni müşterileri belirlemek için)

"Geçerli ay" gereksinimi, özel bir ItemProcessor aracılığıyla uygulanır:

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

"Sınırlı sayıda işlem" gereksinimi ValidatingItemProcessor olarak uygulanır:

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Bu işlemci çifti daha sonra temsilci modelini uygulayan bir CompositeItemProcessor içinde kapsüllenir:

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Özel Yazarlar

Verilerin çıktısını almak için Spring Batch, nesneleri gerektiği gibi seri hale getirmek için org.springframework.batch.item.ItemWriter arabirimini sağlar:

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

write() yöntemi, tüm dahili arabelleklerin temizlendiğinden emin olmaktan sorumludur. Bir işlem etkinse, genellikle bir sonraki geri alma işleminde çıktıyı atmak da gerekli olacaktır. Yazarın veri gönderdiği kaynak, normalde bunu kendisi halledebilmelidir. CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter ve diğerleri gibi standart uygulamalar vardır.

Örnek uygulamamızda filtrelenen müşterilerin listesi şu şekilde yazılmıştır:

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Bahar Toplu İşlerini Zamanlama

Spring Batch, varsayılan olarak, bulabildiği tüm işleri (yani, CustomerReportJobConfig 'da yapılandırılanlar) başlangıçta yürütür. Bu davranışı değiştirmek için, application.properties öğesine aşağıdaki özelliği ekleyerek başlangıçta iş yürütmeyi devre dışı bırakın:

 spring.batch.job.enabled=false

Gerçek zamanlama, daha sonra bir yapılandırma sınıfına @Scheduled notu ve işin kendisini yürüten yönteme @EnableScheduling notu eklenerek elde edilir. Zamanlama, gecikme, oranlar veya cron ifadeleri ile yapılandırılabilir:

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Ancak yukarıdaki örnekte bir sorun var. Çalışma zamanında, iş yalnızca ilk seferde başarılı olur. İkinci kez başlatıldığında (yani beş saniye sonra), günlüklerde aşağıdaki mesajları üretecektir (Spring Batch'in önceki sürümlerinde bir JobInstanceAlreadyCompleteException oluşturulacağını unutmayın):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Bunun nedeni yalnızca benzersiz JobInstance ların oluşturulabilmesi ve yürütülebilmesi ve Spring Batch'in birinci ve ikinci JobInstance arasında ayrım yapmasının hiçbir yolu olmamasıdır.

Bir toplu iş planladığınızda bu sorunu önlemenin iki yolu vardır.

Bunlardan biri, her işe bir veya daha fazla benzersiz parametre (örneğin, nanosaniye cinsinden gerçek başlangıç ​​zamanı) tanıttığınızdan emin olmaktır:

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Alternatif olarak, SimpleJobOperator.startNextInstance() ile belirtilen işe eklenen JobParametersIncrementer tarafından belirlenen bir JobInstance dizisinde sonraki işi başlatabilirsiniz:

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Yaylı Toplu Birim Testi

Genellikle, bir Spring Boot uygulamasında birim testleri çalıştırmak için çerçevenin karşılık gelen bir ApplicationContext yüklemesi gerekir. Bu amaçla iki ek açıklama kullanılır:

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Toplu işleri test etmek için bir org.springframework.batch.test.JobLauncherTestUtils yardımcı programı sınıfı vardır. Tüm bir işi başlatmak için yöntemler sağlamanın yanı sıra işteki her adımı çalıştırmak zorunda kalmadan bireysel adımların uçtan uca test edilmesine olanak tanır. Bahar fasulyesi olarak bildirilmelidir:

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Bir iş ve bir adım için tipik bir test aşağıdaki gibidir (ve herhangi bir alaycı çerçeveyi de kullanabilir):

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch, adım ve iş bağlamları için ek kapsamlar sunar. Bu kapsamlardaki nesneler, Spring kapsayıcısını bir nesne fabrikası olarak kullanır, bu nedenle yürütme adımı veya iş başına bu tür her bir çekirdeğin yalnızca bir örneği vardır. Ek olarak, StepContext veya JobContext erişilebilen referansların geç bağlanması için destek sağlanır. Çalışma zamanında adım veya iş kapsamında olacak şekilde yapılandırılan bileşenlerin, bağlamı bir adım veya iş yürütme işlemindeymiş gibi ayarlama yönteminiz yoksa, bağımsız bileşenler olarak test edilmesi zordur. Spring Batch'teki org.springframework.batch.test.StepScopeTestExecutionListener ve org.springframework.batch.test.StepScopeTestUtils bileşenlerinin yanı sıra JobScopeTestExecutionListener ve JobScopeTestUtils .

TestExecutionListeners , sınıf düzeyinde bildirilir ve görevi, her test yöntemi için bir adım yürütme bağlamı oluşturmaktır. Örneğin:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

İki TestExecutionListener s vardır. Biri normal Spring Test çerçevesindendir ve yapılandırılmış uygulama bağlamından bağımlılık enjeksiyonunu işler. Diğeri, birim testlerine bağımlılık enjeksiyonu için adım kapsamı bağlamı ayarlayan Spring Batch StepScopeTestExecutionListener . Bir test yöntemi süresince bir StepContext oluşturulur ve enjekte edilen tüm bağımlılıklar için kullanılabilir hale getirilir. Varsayılan davranış, yalnızca sabit özelliklere sahip bir StepExecution oluşturmaktır. Alternatif olarak, StepContext , test durumu tarafından doğru türü döndüren bir fabrika yöntemi olarak sağlanabilir.

Başka bir yaklaşım, StepScopeTestUtils yardımcı program sınıfını temel alır. Bu sınıf, bağımlılık enjeksiyonu kullanmadan birim testlerinde StepScope daha esnek bir şekilde oluşturmak ve değiştirmek için kullanılır. Örneğin, yukarıdaki işlemci tarafından filtrelenen müşterinin ID'sinin okunması şu şekilde yapılabilir:

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Gelişmiş Yay Grubu için hazır mısınız?

Bu makale, Spring Batch uygulamalarının tasarım ve geliştirmesinin bazı temellerini tanıtmaktadır. Ancak, ölçekleme, paralel işleme, dinleyiciler ve daha fazlası gibi bu makalede ele alınmayan daha birçok gelişmiş konu ve yetenek vardır. Umarım, bu makale başlamak için yararlı bir temel sağlar.

Bu daha gelişmiş konularla ilgili bilgiler daha sonra Spring Batch için resmi Spring Back belgelerinde bulunabilir.