دروس الدفعة الربيعية: أصبحت المعالجة الدفعية سهلة مع الربيع
نشرت: 2022-03-11تُستخدم معالجة الدُفعات - المتمثلة في التنفيذ الجماعي ، وغير التفاعلي ، وطويل الأمد في كثير من الأحيان ، على نطاق واسع عبر كل صناعة تقريبًا ويتم تطبيقها على مجموعة متنوعة من المهام. قد تكون معالجة الدُفعات عبارة عن بيانات أو عمليات حسابية مكثفة ، ويتم تنفيذها بالتسلسل أو بالتوازي ، ويمكن أن تبدأ من خلال نماذج استدعاء مختلفة ، بما في ذلك مخصصة ، ومجدولة ، وعند الطلب.
يشرح هذا البرنامج التعليمي Spring Batch نموذج البرمجة ولغة المجال للتطبيقات الدفعية بشكل عام ، ويوضح بشكل خاص بعض الأساليب المفيدة لتصميم وتطوير التطبيقات المجمعة باستخدام إصدار Spring Batch 3.0.7 الحالي.
ما هي دفعة الربيع؟
Spring Batch عبارة عن إطار عمل خفيف الوزن وشامل مصمم لتسهيل تطوير تطبيقات دفعات قوية. كما أنه يوفر المزيد من الخدمات والميزات التقنية المتقدمة التي تدعم وظائف الدُفعات عالية الأداء وعالية الأداء من خلال تقنيات التحسين والتقسيم. تعتمد Spring Batch على نهج التطوير المستند إلى POJO الخاص بـ Spring Framework ، المألوف لجميع مطوري Spring ذوي الخبرة.
على سبيل المثال ، تأخذ هذه المقالة في الاعتبار التعليمات البرمجية المصدر من مشروع نموذج يقوم بتحميل ملف عميل بتنسيق XML ، ويقوم بتصفية العملاء حسب السمات المختلفة ، وإخراج الإدخالات التي تمت تصفيتها إلى ملف نصي. يتوفر رمز المصدر لمثال Spring Batch الخاص بنا (والذي يستخدم شروح Lombok التوضيحية) هنا على GitHub ويتطلب Java SE 8 و Maven.
ما هي المعالجة المجمعة؟ المفاهيم والمصطلحات الأساسية
من المهم لأي مطور دفعات أن يكون مألوفًا ومريحًا للمفاهيم الرئيسية لمعالجة الدُفعات. الرسم البياني أدناه هو نسخة مبسطة من هندسة مرجع الدُفعات التي تم إثباتها من خلال عقود من التنفيذ على العديد من الأنظمة الأساسية المختلفة. يقدم المفاهيم والمصطلحات الأساسية ذات الصلة بمعالجة الدُفعات ، كما يستخدمها Spring Batch.
كما هو موضح في مثال المعالجة الدفعية ، يتم تغليف عملية الدُفعات عادةً Job تتكون من Step متعددة. تحتوي كل Step عادةً على ItemReader و ItemProcessor و ItemWriter . يتم تنفيذ Job بواسطة JobLauncher ، ويتم تخزين البيانات الوصفية حول الوظائف التي تم تكوينها وتنفيذها في JobRepository .
قد ترتبط كل Job JobInstance متعددة ، يتم تعريف كل منها بشكل فريد من خلال JobParameters الخاصة بها والتي يتم استخدامها لبدء وظيفة مجمعة. يشار إلى كل عملية تشغيل لـ JobInstance باسم JobExecution . يتتبع كل JobExecution عادةً ما حدث أثناء التشغيل ، مثل الحالة الحالية وحالة الخروج ، وأوقات البدء والانتهاء ، وما إلى ذلك.
Step هي مرحلة مستقلة ومحددة لوظيفة Job ، بحيث تتكون كل Job من Step واحدة أو أكثر. على غرار Job ، تحتوي Step على تنفيذ فردي StepExecution يمثل محاولة واحدة لتنفيذ Step . يخزن StepExecution المعلومات حول الحالة الحالية وحالة الخروج ، وأوقات البدء والانتهاء ، وما إلى ذلك ، بالإضافة إلى المراجع لحالات Step و JobExecution .
ExecutionContext عبارة عن مجموعة من أزواج القيمة الرئيسية التي تحتوي على معلومات يتم تحديد نطاقها إما لـ StepExecution أو JobExecution . يستمر Spring Batch في ExecutionContext ، مما يساعد في الحالات التي تريد فيها إعادة تشغيل تشغيل دفعة (على سبيل المثال ، عند حدوث خطأ فادح ، وما إلى ذلك). كل ما هو مطلوب هو وضع أي كائن ليتم مشاركته بين الخطوات في السياق وسيتولى إطار العمل الباقي. بعد إعادة التشغيل ، تتم استعادة القيم من ExecutionContext السابقة من قاعدة البيانات وتطبيقها.
JobRepository هي الآلية في Spring Batch التي تجعل كل هذا الثبات ممكنًا. يوفر عمليات CRUD لـ JobLauncher و Job و Step Instantiations. بمجرد إطلاق Job ، يتم الحصول على تنفيذ JobExecution من المستودع ، وخلال فترة التنفيذ ، يتم استمرار حالات التنفيذ StepExecution والتنفيذ JobExecution في المستودع.
الشروع في العمل مع إطار العمل الربيعي
تتمثل إحدى مزايا Spring Batch في أن تبعيات المشروع ضئيلة ، مما يجعل من السهل البدء والتشغيل بسرعة. تم تحديد التبعيات القليلة الموجودة وتوضيحها بوضوح في pom.xml الخاص بالمشروع ، والذي يمكن الوصول إليه هنا.
يحدث بدء التشغيل الفعلي للتطبيق في فصل دراسي يشبه ما يلي:
@EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } } يعمل التعليق التوضيحي @EnableBatchProcessing تمكين ميزات Spring Batch ويوفر تكوينًا أساسيًا لإعداد وظائف المجموعة.
يأتي التعليق التوضيحي @SpringBootApplication من مشروع Spring Boot الذي يوفر تطبيقات مستقلة وجاهزة للإنتاج ومستندة إلى Spring. تحدد فئة التكوين التي تعلن عن حبة أو أكثر من حبوب الربيع ، كما تقوم بتشغيل التكوين التلقائي ومسح مكونات Spring.
يحتوي مشروعنا النموذجي على وظيفة واحدة فقط تم تكوينها بواسطة CustomerReportJobConfig باستخدام JobBuilderFactory و StepBuilderFactory . يمكن تحديد الحد الأدنى لتكوين الوظيفة في CustomerReportJobConfig على النحو التالي:
@Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }هناك طريقتان رئيسيتان لبناء الخطوة.
نهج واحد ، كما هو موضح في المثال أعلاه ، يعتمد على المهام الصغيرة . يدعم Tasklet واجهة بسيطة لها طريقة واحدة فقط ، وهي execute() ، والتي يتم استدعاؤها بشكل متكرر حتى تقوم إما بإرجاع RepeatStatus.FINISHED أو طرح استثناء للإشارة إلى فشل. يتم تغليف كل استدعاء إلى Tasklet في معاملة.
هناك طريقة أخرى ، المعالجة المقطوعة ، تشير إلى قراءة البيانات بالتسلسل وإنشاء "أجزاء" سيتم كتابتها داخل حدود المعاملة. تتم قراءة كل عنصر فردي من ItemReader ، ويتم تسليمه إلى ItemProcessor ، ويتم تجميعه. بمجرد أن يساوي عدد العناصر المقروءة فاصل الالتزام ، تتم كتابة الجزء بأكمله عبر ItemWriter ، ثم يتم تنفيذ المعاملة. يمكن تكوين الخطوة الموجهة للقطعة على النحو التالي:
@Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); } تبني طريقة chunk() خطوة لمعالجة العناصر في أجزاء بالحجم المتاح ، مع تمرير كل جزء إلى القارئ والمعالج والكاتب المحدد. تمت مناقشة هذه الأساليب بمزيد من التفصيل في الأقسام التالية من هذه المقالة.
قارئ مخصص
بالنسبة لتطبيق نموذج Spring Batch الخاص بنا ، من أجل قراءة قائمة العملاء من ملف XML ، نحتاج إلى توفير تنفيذ للواجهة org.springframework.batch.item.ItemReader :
public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; } يوفر ItemReader البيانات ومن المتوقع أن يكون ذا الحالة. عادة ما يتم استدعاؤه عدة مرات لكل دفعة ، مع كل استدعاء read() يعيد القيمة التالية ويعيد القيمة null في النهاية عندما يتم استنفاد جميع بيانات الإدخال.
يوفر Spring Batch بعض تطبيقات Out-of-the-Box لـ ItemReader ، والتي يمكن استخدامها لمجموعة متنوعة من الأغراض مثل قراءة المجموعات والملفات ودمج JMS و JDBC بالإضافة إلى مصادر متعددة ، وما إلى ذلك.
في نموذج التطبيق الخاص بنا ، تقوم فئة CustomerItemReader بتفويض مكالمات read() الفعلية إلى مثيل مهيأ بشكل كسول لفئة IteratorItemReader :
public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } } يتم إنشاء برنامج فصل الربيع لهذا التنفيذ باستخدام التعليقات التوضيحية @StepScope @Component مما يتيح لـ Spring معرفة أن هذه الفئة عبارة عن مكون Spring متدرج النطاق وسيتم إنشاؤه مرة واحدة لكل خطوة تنفيذ على النحو التالي:
@StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }معالجات مخصصة
ItemProcessors بتحويل عناصر الإدخال وتقديم منطق الأعمال في سيناريو معالجة يعتمد على العنصر. يجب أن توفر تطبيقًا لواجهة org.springframework.batch.item.ItemProcessor :
public interface ItemProcessor<I, O> { O process(I item) throws Exception; } تقبل process() مثيلًا واحدًا من الفئة I وقد تُرجع أو لا تُرجع مثيلًا من نفس النوع. تشير إعادة القيمة null إلى أنه لا يجب متابعة معالجة العنصر. كالعادة ، يوفر Spring عددًا قليلاً من المعالجات القياسية ، مثل CompositeItemProcessor الذي يمرر العنصر عبر سلسلة من ItemProcessor المحقونة ومعالج ValidatingItemProcessor الذي يتحقق من صحة الإدخال.
في حالة تطبيق العينة الخاص بنا ، يتم استخدام المعالجات لتصفية العملاء حسب المتطلبات التالية:
- يجب أن يولد العميل في الشهر الحالي (على سبيل المثال ، للإشارة إلى عروض أعياد الميلاد الخاصة ، وما إلى ذلك)
- يجب أن يكون لدى العميل أقل من خمس معاملات مكتملة (على سبيل المثال ، لتحديد العملاء الجدد)
يتم تنفيذ متطلب "الشهر الحالي" عبر ItemProcessor المخصص:
public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } } يتم تنفيذ مطلب "عدد محدود من المعاملات" باعتباره ValidatingItemProcessor :

public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } } يتم بعد ذلك تغليف هذا الزوج من المعالجات داخل معالج CompositeItemProcessor الذي ينفذ نمط المفوض:
@StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }كتاب مخصص
لإخراج البيانات ، يوفر Spring Batch الواجهة org.springframework.batch.item.ItemWriter لتسلسل الكائنات حسب الضرورة:
public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; } طريقة write() مسؤولة عن التأكد من مسح أي مخازن مؤقتة داخلية. إذا كانت المعاملة نشطة ، فعادة ما يكون من الضروري أيضًا تجاهل الإخراج عند التراجع التالي. يجب أن يكون المورد الذي يرسل إليه الكاتب البيانات قادرًا على التعامل مع هذا الأمر بنفسه. هناك تطبيقات قياسية مثل CompositeItemWriter و JdbcBatchItemWriter و JmsItemWriter و SimpleMailMessageItemWriter و JpaItemWriter وغيرها.
في نموذج التطبيق الخاص بنا ، يتم كتابة قائمة العملاء الذين تمت تصفيتهم على النحو التالي:
public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }جدولة وظائف دفعة الربيع
بشكل افتراضي ، ينفذ Spring Batch جميع الوظائف التي يمكنه العثور عليها (على سبيل المثال ، التي تم تكوينها كما في CustomerReportJobConfig ) عند بدء التشغيل. لتغيير هذا السلوك ، قم بتعطيل تنفيذ المهمة عند بدء التشغيل عن طريق إضافة الخاصية التالية إلى application.properties :
spring.batch.job.enabled=false ثم يتم تحقيق الجدولة الفعلية عن طريق إضافة التعليق التوضيحي @EnableScheduling إلى فئة التكوين والتعليق التوضيحي @Scheduled إلى الطريقة التي تنفذ الوظيفة نفسها. يمكن تكوين الجدولة باستخدام التأخير أو المعدلات أو تعبيرات cron:
// run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); } هناك مشكلة في المثال أعلاه بالرغم من ذلك. في وقت التشغيل ، ستنجح الوظيفة في المرة الأولى فقط. عندما يتم تشغيله في المرة الثانية (أي بعد خمس ثوانٍ) ، فإنه سينشئ الرسائل التالية في السجلات (لاحظ أنه في الإصدارات السابقة من Spring Batch ، كان من الممكن طرح JobInstanceAlreadyCompleteException ):
INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription= يحدث هذا لأنه قد يتم إنشاء وتنفيذ JobInstance فريدة فقط ولا توجد طريقة Spring Batch للتمييز بين JobInstance الأول والثاني.
هناك طريقتان لتجنب هذه المشكلة عند جدولة وظيفة مجمعة.
أحدهما هو التأكد من تقديم واحد أو أكثر من المعلمات الفريدة (على سبيل المثال ، وقت البدء الفعلي بالنانو ثانية) لكل وظيفة:
@Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); } بدلاً من ذلك ، يمكنك بدء المهمة التالية في تسلسل JobInstance s الذي تحدده JobParametersIncrementer المرتبطة بالوظيفة المحددة باستخدام SimpleJobOperator.startNextInstance() :
@Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }اختبار وحدة دفعة الربيع
عادةً ، لتشغيل اختبارات الوحدة في تطبيق Spring Boot ، يجب أن يقوم إطار العمل بتحميل ApplicationContext المقابل. تم استخدام تعليقين توضيحيين لهذا الغرض:
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {...}) توجد فئة الأداة المساعدة org.springframework.batch.test.JobLauncherTestUtils لاختبار الوظائف المجمعة. يوفر طرقًا لبدء مهمة كاملة بالإضافة إلى السماح بإجراء اختبار شامل للخطوات الفردية دون الحاجة إلى تشغيل كل خطوة في الوظيفة. يجب إعلانها على أنها فاصوليا ربيعية:
@Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }يبدو الاختبار النموذجي لوظيفة وخطوة على النحو التالي (ويمكن استخدام أي أطر عمل للسخرية أيضًا):
@RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } } تقدم Spring Batch نطاقات إضافية لسياقات الخطوة والوظيفة. تستخدم الكائنات الموجودة في هذه النطاقات حاوية Spring كمصنع كائن ، لذلك لا يوجد سوى مثيل واحد لكل وحدة في كل خطوة تنفيذ أو مهمة. بالإضافة إلى ذلك ، يتم توفير الدعم للربط المتأخر للمراجع التي يمكن الوصول إليها من StepContext أو JobContext . المكونات التي تم تكوينها في وقت التشغيل لتكون ذات نطاق تدريجي أو مهمة يصعب اختبارها كمكونات قائمة بذاتها ما لم يكن لديك طريقة لتعيين السياق كما لو كانت في خطوة أو تنفيذ مهمة. هذا هو الهدف من مكونات org.springframework.batch.test.StepScopeTestExecutionListener و org.springframework.batch.test.StepScopeTestUtils في Spring Batch ، بالإضافة إلى JobScopeTestExecutionListener و JobScopeTestUtils .
يتم الإعلان عن TestExecutionListeners على مستوى الفصل الدراسي ، وتتمثل مهمتها في إنشاء سياق تنفيذ خطوة لكل طريقة اختبار. علي سبيل المثال:
@RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } } هناك نوعان من TestExecutionListener s. أحدهما من إطار عمل اختبار الربيع العادي ويتعامل مع إدخال التبعية من سياق التطبيق الذي تم تكوينه. والآخر هو Spring Batch StepScopeTestExecutionListener الذي يُنشئ سياق نطاق الخطوة لحقن التبعية في اختبارات الوحدة. يتم إنشاء StepContext لمدة أسلوب الاختبار وإتاحته لأي تبعيات يتم إدخالها. السلوك الافتراضي هو فقط إنشاء StepExecution مع الخصائص الثابتة. بدلاً من ذلك ، يمكن توفير StepContext بواسطة حالة الاختبار كطريقة مصنع تُرجع النوع الصحيح.
يعتمد أسلوب آخر على فئة الأداة المساعدة StepScopeTestUtils . تُستخدم هذه الفئة لإنشاء StepScope ومعالجتها في اختبارات الوحدة بطريقة أكثر مرونة دون استخدام حقن التبعية. على سبيل المثال ، يمكن قراءة معرف العميل الذي تمت تصفيته بواسطة المعالج أعلاه على النحو التالي:
@Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }هل أنت مستعد لدُفعة الربيع المتقدمة؟
تقدم هذه المقالة بعض أساسيات تصميم وتطوير تطبيقات Spring Batch. ومع ذلك ، هناك العديد من الموضوعات والقدرات الأكثر تقدمًا - مثل القياس والمعالجة المتوازية والمستمعين وغير ذلك - التي لم يتم تناولها في هذه المقالة. نأمل أن توفر هذه المقالة أساسًا مفيدًا للبدء.
يمكن بعد ذلك العثور على معلومات حول هذه الموضوعات الأكثر تقدمًا في وثائق Spring Back الرسمية لـ Spring Batch.
