Spring Batch Eğitimi: Yay ile Toplu İşleme Kolaylaştı
Yayınlanan: 2022-03-11Toplu işleme (yığın odaklı, etkileşimli olmayan ve sıklıkla uzun süreli arka planda yürütme ile tanımlanan) hemen hemen her sektörde yaygın olarak kullanılır ve çok çeşitli görevlere uygulanır. Toplu işleme, veri veya hesaplama açısından yoğun olabilir, sıralı veya paralel olarak yürütülebilir ve geçici, planlanmış ve isteğe bağlı dahil olmak üzere çeşitli çağırma modelleri aracılığıyla başlatılabilir.
Bu Spring Batch öğreticisi, genel olarak toplu uygulamaların programlama modelini ve etki alanı dilini açıklar ve özellikle, mevcut Spring Batch 3.0.7 sürümünü kullanan toplu uygulamaların tasarımı ve geliştirilmesine yönelik bazı yararlı yaklaşımları gösterir.
Bahar Grubu nedir?
Spring Batch, sağlam toplu uygulamaların geliştirilmesini kolaylaştırmak için tasarlanmış hafif, kapsamlı bir çerçevedir. Ayrıca, optimizasyon ve bölümleme teknikleri ile son derece yüksek hacimli ve yüksek performanslı toplu işleri destekleyen daha gelişmiş teknik hizmetler ve özellikler sağlar. Spring Batch, tüm deneyimli Spring geliştiricilerinin aşina olduğu, Spring Framework'ün POJO tabanlı geliştirme yaklaşımı üzerine kuruludur.
Örnek olarak, bu makale, XML biçimli bir müşteri dosyası yükleyen, müşterileri çeşitli özniteliklere göre filtreleyen ve filtrelenmiş girdileri bir metin dosyasına çıkaran örnek bir projeden kaynak kodunu ele almaktadır. Spring Batch örneğimizin (Lombok ek açıklamalarını kullanan) kaynak kodu burada GitHub'da mevcuttur ve Java SE 8 ve Maven gerektirir.
Toplu İşleme Nedir? Anahtar Kavramlar ve Terminoloji
Herhangi bir toplu iş geliştiricisinin toplu işlemenin ana kavramlarına aşina ve rahat olması önemlidir. Aşağıdaki şema, birçok farklı platformda onlarca yıllık uygulamalarla kanıtlanmış toplu referans mimarisinin basitleştirilmiş bir versiyonudur. Spring Batch tarafından kullanıldığı şekliyle toplu işlemeyle ilgili temel kavramları ve terimleri tanıtır.
Toplu işleme örneğimizde gösterildiği gibi, bir toplu işlem tipik olarak birden çok Step
oluşan bir Job
tarafından kapsüllenir. Her Step
genellikle tek bir ItemReader
, ItemProcessor
ve ItemWriter
. Bir Job
, bir JobLauncher
tarafından yürütülür ve yapılandırılan ve yürütülen işlerle ilgili meta veriler bir JobRepository
depolanır.
Her Job
, her biri bir toplu işi başlatmak için kullanılan belirli JobParameters
tarafından benzersiz olarak tanımlanan birden çok JobInstance
s ile ilişkilendirilebilir. Bir JobExecution
her çalıştırması, JobInstance
olarak adlandırılır. Her JobExecution
tipik olarak mevcut ve çıkış durumları, başlangıç ve bitiş saatleri vb. gibi bir çalıştırma sırasında olanları izler.
Step
, bir toplu Job
bağımsız, belirli bir aşamasıdır, öyle ki her Job
bir veya daha fazla Step
oluşur. Bir Job
benzer şekilde, bir Step
Step
için tek bir girişimi temsil eden ayrı bir StepExecution
vardır. StepExecution
, geçerli ve çıkış durumları, başlangıç ve bitiş zamanları vb. ile ilgili bilgileri ve ilgili Step
ve JobExecution
örneklerine referansları saklar.
ExecutionContext
, StepExecution
veya JobExecution
kapsamına giren bilgileri içeren bir anahtar/değer çiftleri kümesidir. Spring Batch, bir toplu çalıştırmayı yeniden başlatmak istediğiniz durumlarda (örneğin, önemli bir hata oluştuğunda, vb.) yardımcı olan ExecutionContext
öğesini sürdürür. Gerekli olan tek şey, adımlar arasında paylaşılacak herhangi bir nesneyi bağlama koymaktır ve gerisini çerçeve halledecektir. Yeniden başlattıktan sonra, önceki ExecutionContext
değerler veritabanından geri yüklenir ve uygulanır.
JobRepository
, Spring Batch'teki tüm bu kalıcılığı mümkün kılan mekanizmadır. JobLauncher
, Job
ve Step
örneklemeleri için CRUD işlemleri sağlar. Bir Job
başlatıldığında, depodan bir JobExecution
alınır ve yürütme sırasında StepExecution
ve JobExecution
örnekleri depoda kalıcı olur.
Spring Batch Framework'e Başlarken
Spring Batch'in avantajlarından biri, proje bağımlılıklarının minimum düzeyde olması ve bu da hızlı bir şekilde çalışmaya başlamayı kolaylaştırmasıdır. Var olan birkaç bağımlılık, projenin buradan erişilebilen pom.xml
açıkça belirtilmiş ve açıklanmıştır.
Uygulamanın asıl başlatılması, aşağıdakine benzer bir sınıfta gerçekleşir:
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }
@EnableBatchProcessing
ek açıklaması, Spring Batch özelliklerini etkinleştirir ve toplu işleri ayarlamak için temel bir yapılandırma sağlar.
@SpringBootApplication
ek açıklaması, bağımsız, üretime hazır, Spring tabanlı uygulamalar sağlayan Spring Boot projesinden gelir. Bir veya daha fazla Spring bean bildiren ve ayrıca otomatik konfigürasyonu ve Spring'in bileşen taramasını tetikleyen bir konfigürasyon sınıfını belirtir.
Örnek projemiz, CustomerReportJobConfig
tarafından enjekte edilmiş bir JobBuilderFactory
ve StepBuilderFactory
ile yapılandırılan yalnızca bir işe sahiptir. Minimum iş yapılandırması, CustomerReportJobConfig
şu şekilde tanımlanabilir:
@Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }
Bir adım oluşturmak için iki ana yaklaşım vardır.
Yukarıdaki örnekte gösterildiği gibi bir yaklaşım, görev uygulamasına dayalıdır . Bir Tasklet
, RepeatStatus.FINISHED
döndürene veya bir hata sinyali vermek için bir istisna oluşturana kadar tekrar tekrar çağrılan execute()
adlı tek bir yöntemi olan basit bir arabirimi destekler. Tasklet
yapılan her çağrı bir işleme sarılır.
Diğer bir yaklaşım, yığın yönelimli işleme , verileri sırayla okumak ve bir işlem sınırı içinde yazılacak "parçalar" oluşturmak anlamına gelir. Her bir öğe bir ItemReader'dan okunur, bir ItemReader
ItemProcessor
ve toplanır. Okunan öğe sayısı kesinleştirme aralığına eşit olduğunda, tüm yığın ItemWriter
aracılığıyla yazılır ve ardından işlem tamamlanır. Yığın odaklı bir adım aşağıdaki gibi yapılandırılabilir:
@Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
chunk()
yöntemi, her bir parçanın belirtilen okuyucuya, işlemciye ve yazıcıya iletildiği, sağlanan boyutta parçalar halinde öğeleri işleyen bir adım oluşturur. Bu yöntemler, bu makalenin sonraki bölümlerinde daha ayrıntılı olarak tartışılmaktadır.
Özel Okuyucu
Spring Batch örnek uygulamamız için, bir XML dosyasından müşteri listesini okumak için, org.springframework.batch.item.ItemReader
arabiriminin bir uygulamasını sağlamamız gerekiyor:
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }
Bir ItemReader
, verileri sağlar ve durum bilgisi olması beklenir. Genellikle her toplu iş için birden çok kez çağrılır, her read()
çağrısı bir sonraki değeri döndürür ve son olarak tüm girdi verileri tükendiğinde null
değerini döndürür.
Spring Batch, koleksiyonları, dosyaları okumak, JMS ve JDBC'yi ve aynı zamanda birden çok kaynağı entegre etmek gibi çeşitli amaçlar için kullanılabilen bazı ItemReader
uygulamalarını sağlar.
Örnek uygulamamızda, CustomerItemReader
sınıfı, gerçek read()
çağrılarını IteratorItemReader
sınıfının tembelce başlatılan bir örneğine devreder:
public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }
Bu uygulama için bir Spring bean, @Component
ve @StepScope
ek açıklamalarıyla oluşturulur ve Spring'in bu sınıfın adım kapsamlı bir Spring bileşeni olduğunu ve aşağıdaki gibi adım yürütme başına bir kez oluşturulacağını bilmesini sağlar:
@StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }
Özel İşlemciler
ItemProcessors
, girdi öğelerini dönüştürür ve iş mantığını öğeye dayalı bir işleme senaryosunda sunar. org.springframework.batch.item.ItemProcessor
arabiriminin bir uygulamasını sağlamaları gerekir:
public interface ItemProcessor<I, O> { O process(I item) throws Exception; }
process()
yöntemi, I
sınıfının bir örneğini kabul eder ve aynı türden bir örnek döndürebilir veya döndürmeyebilir. null
döndürmek, öğenin işlenmeye devam etmemesi gerektiğini gösterir. Her zamanki gibi Spring, öğeyi enjekte edilen bir ItemProcessor
s dizisi ve girişi doğrulayan bir ValidatingItemProcessor
içinden geçiren CompositeItemProcessor
gibi birkaç standart işlemci sağlar.
Örnek uygulamamız söz konusu olduğunda, müşterileri aşağıdaki gereksinimlere göre filtrelemek için işlemciler kullanılır:
- Bir müşteri, içinde bulunulan ayda doğmuş olmalıdır (örneğin, doğum günü spesiyallerini işaretlemek vb.)
- Bir müşterinin beşten az tamamlanmış işlemi olmalıdır (örneğin, daha yeni müşterileri belirlemek için)
"Geçerli ay" gereksinimi, özel bir ItemProcessor
aracılığıyla uygulanır:

public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }
"Sınırlı sayıda işlem" gereksinimi ValidatingItemProcessor
olarak uygulanır:
public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }
Bu işlemci çifti daha sonra temsilci modelini uygulayan bir CompositeItemProcessor
içinde kapsüllenir:
@StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }
Özel Yazarlar
Verilerin çıktısını almak için Spring Batch, nesneleri gerektiği gibi seri hale getirmek için org.springframework.batch.item.ItemWriter
arabirimini sağlar:
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }
write()
yöntemi, tüm dahili arabelleklerin temizlendiğinden emin olmaktan sorumludur. Bir işlem etkinse, genellikle bir sonraki geri alma işleminde çıktıyı atmak da gerekli olacaktır. Yazarın veri gönderdiği kaynak, normalde bunu kendisi halledebilmelidir. CompositeItemWriter
, JdbcBatchItemWriter
, JmsItemWriter
, JpaItemWriter
, SimpleMailMessageItemWriter
ve diğerleri gibi standart uygulamalar vardır.
Örnek uygulamamızda filtrelenen müşterilerin listesi şu şekilde yazılmıştır:
public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }
Bahar Toplu İşlerini Zamanlama
Spring Batch, varsayılan olarak, bulabildiği tüm işleri (yani, CustomerReportJobConfig
'da yapılandırılanlar) başlangıçta yürütür. Bu davranışı değiştirmek için, application.properties
öğesine aşağıdaki özelliği ekleyerek başlangıçta iş yürütmeyi devre dışı bırakın:
spring.batch.job.enabled=false
Gerçek zamanlama, daha sonra bir yapılandırma sınıfına @Scheduled
notu ve işin kendisini yürüten yönteme @EnableScheduling
notu eklenerek elde edilir. Zamanlama, gecikme, oranlar veya cron ifadeleri ile yapılandırılabilir:
// run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }
Ancak yukarıdaki örnekte bir sorun var. Çalışma zamanında, iş yalnızca ilk seferde başarılı olur. İkinci kez başlatıldığında (yani beş saniye sonra), günlüklerde aşağıdaki mesajları üretecektir (Spring Batch'in önceki sürümlerinde bir JobInstanceAlreadyCompleteException
oluşturulacağını unutmayın):
INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=
Bunun nedeni yalnızca benzersiz JobInstance
ların oluşturulabilmesi ve yürütülebilmesi ve Spring Batch'in birinci ve ikinci JobInstance
arasında ayrım yapmasının hiçbir yolu olmamasıdır.
Bir toplu iş planladığınızda bu sorunu önlemenin iki yolu vardır.
Bunlardan biri, her işe bir veya daha fazla benzersiz parametre (örneğin, nanosaniye cinsinden gerçek başlangıç zamanı) tanıttığınızdan emin olmaktır:
@Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }
Alternatif olarak, SimpleJobOperator.startNextInstance()
ile belirtilen işe eklenen JobParametersIncrementer
tarafından belirlenen bir JobInstance
dizisinde sonraki işi başlatabilirsiniz:
@Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }
Yaylı Toplu Birim Testi
Genellikle, bir Spring Boot uygulamasında birim testleri çalıştırmak için çerçevenin karşılık gelen bir ApplicationContext
yüklemesi gerekir. Bu amaçla iki ek açıklama kullanılır:
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})
Toplu işleri test etmek için bir org.springframework.batch.test.JobLauncherTestUtils
yardımcı programı sınıfı vardır. Tüm bir işi başlatmak için yöntemler sağlamanın yanı sıra işteki her adımı çalıştırmak zorunda kalmadan bireysel adımların uçtan uca test edilmesine olanak tanır. Bahar fasulyesi olarak bildirilmelidir:
@Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }
Bir iş ve bir adım için tipik bir test aşağıdaki gibidir (ve herhangi bir alaycı çerçeveyi de kullanabilir):
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }
Spring Batch, adım ve iş bağlamları için ek kapsamlar sunar. Bu kapsamlardaki nesneler, Spring kapsayıcısını bir nesne fabrikası olarak kullanır, bu nedenle yürütme adımı veya iş başına bu tür her bir çekirdeğin yalnızca bir örneği vardır. Ek olarak, StepContext
veya JobContext
erişilebilen referansların geç bağlanması için destek sağlanır. Çalışma zamanında adım veya iş kapsamında olacak şekilde yapılandırılan bileşenlerin, bağlamı bir adım veya iş yürütme işlemindeymiş gibi ayarlama yönteminiz yoksa, bağımsız bileşenler olarak test edilmesi zordur. Spring Batch'teki org.springframework.batch.test.StepScopeTestExecutionListener
ve org.springframework.batch.test.StepScopeTestUtils
bileşenlerinin yanı sıra JobScopeTestExecutionListener
ve JobScopeTestUtils
.
TestExecutionListeners
, sınıf düzeyinde bildirilir ve görevi, her test yöntemi için bir adım yürütme bağlamı oluşturmaktır. Örneğin:
@RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }
İki TestExecutionListener
s vardır. Biri normal Spring Test çerçevesindendir ve yapılandırılmış uygulama bağlamından bağımlılık enjeksiyonunu işler. Diğeri, birim testlerine bağımlılık enjeksiyonu için adım kapsamı bağlamı ayarlayan Spring Batch StepScopeTestExecutionListener
. Bir test yöntemi süresince bir StepContext
oluşturulur ve enjekte edilen tüm bağımlılıklar için kullanılabilir hale getirilir. Varsayılan davranış, yalnızca sabit özelliklere sahip bir StepExecution
oluşturmaktır. Alternatif olarak, StepContext
, test durumu tarafından doğru türü döndüren bir fabrika yöntemi olarak sağlanabilir.
Başka bir yaklaşım, StepScopeTestUtils
yardımcı program sınıfını temel alır. Bu sınıf, bağımlılık enjeksiyonu kullanmadan birim testlerinde StepScope
daha esnek bir şekilde oluşturmak ve değiştirmek için kullanılır. Örneğin, yukarıdaki işlemci tarafından filtrelenen müşterinin ID'sinin okunması şu şekilde yapılabilir:
@Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }
Gelişmiş Yay Grubu için hazır mısınız?
Bu makale, Spring Batch uygulamalarının tasarım ve geliştirmesinin bazı temellerini tanıtmaktadır. Ancak, ölçekleme, paralel işleme, dinleyiciler ve daha fazlası gibi bu makalede ele alınmayan daha birçok gelişmiş konu ve yetenek vardır. Umarım, bu makale başlamak için yararlı bir temel sağlar.
Bu daha gelişmiş konularla ilgili bilgiler daha sonra Spring Batch için resmi Spring Back belgelerinde bulunabilir.