Tutoriel Spring Batch: Le traitement par lots simplifié avec Spring

Publié: 2022-03-11

Le traitement par lots, caractérisé par une exécution en arrière-plan axée sur le volume, non interactive et souvent longue, est largement utilisé dans pratiquement tous les secteurs et est appliqué à un large éventail de tâches. Le traitement par lots peut être gourmand en données ou en calcul, s'exécuter séquentiellement ou en parallèle, et peut être lancé via divers modèles d'invocation, y compris ad hoc, planifié et à la demande.

Ce tutoriel Spring Batch explique le modèle de programmation et le langage de domaine des applications batch en général et, en particulier, montre quelques approches utiles pour la conception et le développement d'applications batch utilisant la version actuelle de Spring Batch 3.0.7 .

Qu'est-ce que le Spring Batch ?

Spring Batch est un cadre léger et complet conçu pour faciliter le développement d'applications par lots robustes. Il fournit également des services et des fonctionnalités techniques plus avancés qui prennent en charge des travaux par lots extrêmement volumineux et hautes performances grâce à ses techniques d'optimisation et de partitionnement. Spring Batch s'appuie sur l'approche de développement basée sur POJO de Spring Framework, familière à tous les développeurs Spring expérimentés.

À titre d'exemple, cet article considère le code source d'un exemple de projet qui charge un fichier client au format XML, filtre les clients selon divers attributs et génère les entrées filtrées dans un fichier texte. Le code source de notre exemple Spring Batch (qui utilise les annotations Lombok) est disponible ici sur GitHub et nécessite Java SE 8 et Maven.

Qu'est-ce que le traitement par lots ? Concepts clés et terminologie

Il est important que tout développeur batch soit familiarisé et à l'aise avec les principaux concepts du traitement batch. Le diagramme ci-dessous est une version simplifiée de l'architecture de référence par lots qui a fait ses preuves au cours de décennies d'implémentations sur de nombreuses plates-formes différentes. Il présente les concepts et termes clés relatifs au traitement par lots, tels qu'utilisés par Spring Batch.

Tutoriel Spring Batch : Concepts clés et terminologie

Comme le montre notre exemple de traitement par lots, un traitement par lots est généralement encapsulé par un Job composé de plusieurs Step . Chaque Step a généralement un seul ItemReader , ItemProcessor et ItemWriter . Un Job est exécuté par un JobLauncher , et les métadonnées sur les jobs configurés et exécutés sont stockées dans un JobRepository .

Chaque Job peut être associé à plusieurs JobInstance s, chacune étant définie de manière unique par ses JobParameters particuliers qui sont utilisés pour démarrer un travail par lots. Chaque exécution d'une JobInstance est appelée JobExecution . Chaque JobExecution suit généralement ce qui s'est passé pendant une exécution, comme les statuts actuels et de sortie, les heures de début et de fin, etc.

Une Step est une phase indépendante et spécifique d'un Job batch , de sorte que chaque Job est composé d'une ou plusieurs Step s. Semblable à un Job , une Step possède une StepExecution individuelle qui représente une seule tentative d' exécution d' une Step . StepExecution stocke les informations sur les statuts actuels et de sortie, les heures de début et de fin, etc., ainsi que les références à ses instances Step et JobExecution correspondantes.

Un ExecutionContext est un ensemble de paires clé-valeur contenant des informations limitées à StepExecution ou JobExecution . Spring Batch conserve le ExecutionContext , ce qui aide dans les cas où vous souhaitez redémarrer une exécution par lots (par exemple, lorsqu'une erreur fatale s'est produite, etc.). Tout ce qui est nécessaire est de mettre tout objet à partager entre les étapes dans le contexte et le cadre se chargera du reste. Après le redémarrage, les valeurs du ExecutionContext précédent sont restaurées à partir de la base de données et appliquées.

JobRepository est le mécanisme de Spring Batch qui rend possible toute cette persistance. Il fournit des opérations CRUD pour JobLauncher , Job et Step . Une fois qu'un Job est lancé, une JobExecution est obtenue à partir du référentiel et, au cours de l'exécution, les instances StepExecution et JobExecution sont conservées dans le référentiel.

Premiers pas avec Spring Batch Framework

L'un des avantages de Spring Batch est que les dépendances de projet sont minimes, ce qui facilite la mise en service rapide. Les quelques dépendances qui existent sont clairement spécifiées et expliquées dans le pom.xml du projet, accessible ici.

Le démarrage réel de l'application se produit dans une classe ressemblant à ceci :

 @EnableBatchProcessing @SpringBootApplication public class BatchApplication { public static void main(String[] args) { prepareTestData(1000); SpringApplication.run(BatchApplication.class, args); } }

L'annotation @EnableBatchProcessing active les fonctionnalités Spring Batch et fournit une configuration de base pour la configuration des tâches par lots.

L'annotation @SpringBootApplication provient du projet Spring Boot qui fournit des applications Spring autonomes, prêtes pour la production. Il spécifie une classe de configuration qui déclare un ou plusieurs beans Spring et déclenche également la configuration automatique et l'analyse des composants de Spring.

Notre exemple de projet n'a qu'un seul travail qui est configuré par CustomerReportJobConfig avec un JobBuilderFactory et un StepBuilderFactory . La configuration minimale du travail peut être définie dans CustomerReportJobConfig comme suit :

 @Configuration public class CustomerReportJobConfig { @Autowired private JobBuilderFactory jobBuilders; @Autowired private StepBuilderFactory stepBuilders; @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step taskletStep() { return stepBuilders.get("taskletStep") .tasklet(tasklet()) .build(); } @Bean public Tasklet tasklet() { return (contribution, chunkContext) -> { return RepeatStatus.FINISHED; }; } }

Il existe deux approches principales pour construire une marche.

Une approche, comme illustré dans l'exemple ci-dessus, est basée sur les tasklets . Une Tasklet prend en charge une interface simple qui n'a qu'une seule méthode, execute() , qui est appelée à plusieurs reprises jusqu'à ce qu'elle renvoie RepeatStatus.FINISHED ou lève une exception pour signaler un échec. Chaque appel au Tasklet est enveloppé dans une transaction.

Une autre approche, le traitement orienté bloc , fait référence à la lecture séquentielle des données et à la création de « morceaux » qui seront écrits dans une limite de transaction. Chaque élément individuel est lu à partir d'un ItemReader , transmis à un ItemProcessor et agrégé. Une fois que le nombre d'éléments lus est égal à l'intervalle de validation, le bloc entier est écrit via ItemWriter , puis la transaction est validée. Une étape orientée chunk peut être configurée comme suit :

 @Bean public Job customerReportJob() { return jobBuilders.get("customerReportJob") .start(taskletStep()) .next(chunkStep()) .build(); } @Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }

La méthode chunk() construit une étape qui traite les éléments en morceaux avec la taille fournie, chaque morceau étant ensuite transmis au lecteur, au processeur et à l'écrivain spécifiés. Ces méthodes sont décrites plus en détail dans les sections suivantes de cet article.

Lecteur personnalisé

Pour notre exemple d'application Spring Batch, afin de lire une liste de clients à partir d'un fichier XML, nous devons fournir une implémentation de l'interface org.springframework.batch.item.ItemReader :

 public interface ItemReader<T> { T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException; }

Un ItemReader fournit les données et devrait être avec état. Il est généralement appelé plusieurs fois pour chaque lot, chaque appel à read() renvoyant la valeur suivante et renvoyant finalement null lorsque toutes les données d'entrée ont été épuisées.

Spring Batch fournit des implémentations prêtes à l'emploi de ItemReader , qui peuvent être utilisées à diverses fins telles que la lecture de collections, de fichiers, l'intégration de JMS et JDBC ainsi que plusieurs sources, etc.

Dans notre exemple d'application, la classe CustomerItemReader délègue les appels read() réels à une instance initialisée paresseusement de la classe IteratorItemReader :

 public class CustomerItemReader implements ItemReader<Customer> { private final String filename; private ItemReader<Customer> delegate; public CustomerItemReader(final String filename) { this.filename = filename; } @Override public Customer read() throws Exception { if (delegate == null) { delegate = new IteratorItemReader<>(customers()); } return delegate.read(); } private List<Customer> customers() throws FileNotFoundException { try (XMLDecoder decoder = new XMLDecoder(new FileInputStream(filename))) { return (List<Customer>) decoder.readObject(); } } }

Un bean Spring pour cette implémentation est créé avec les annotations @Component et @StepScope , indiquant à Spring que cette classe est un composant Spring à portée d'étape et qu'elle sera créée une fois par étape d'exécution comme suit :

 @StepScope @Bean public ItemReader<Customer> reader() { return new CustomerItemReader(XML_FILE); }

Processeurs personnalisés

ItemProcessors transforment les éléments d'entrée et introduisent une logique métier dans un scénario de traitement orienté élément. Ils doivent fournir une implémentation de l'interface org.springframework.batch.item.ItemProcessor :

 public interface ItemProcessor<I, O> { O process(I item) throws Exception; }

La méthode process() accepte une instance de la classe I et peut ou non renvoyer une instance du même type. Le retour de null indique que l'élément ne doit pas continuer à être traité. Comme d'habitude, Spring fournit quelques processeurs standard, tels que CompositeItemProcessor qui transmet l'élément à travers une séquence d' ItemProcessor injectés et un ValidatingItemProcessor qui valide l'entrée.

Dans le cas de notre exemple d'application, les processeurs sont utilisés pour filtrer les clients selon les exigences suivantes :

  • Un client doit être né au cours du mois en cours (par exemple, pour signaler les spéciaux d'anniversaire, etc.)
  • Un client doit avoir moins de cinq transactions terminées (par exemple, pour identifier de nouveaux clients)

L'exigence "mois en cours" est implémentée via un ItemProcessor personnalisé :

 public class BirthdayFilterProcessor implements ItemProcessor<Customer, Customer> { @Override public Customer process(final Customer item) throws Exception { if (new GregorianCalendar().get(Calendar.MONTH) == item.getBirthday().get(Calendar.MONTH)) { return item; } return null; } }

L'exigence "nombre limité de transactions" est implémentée en tant que ValidatingItemProcessor :

 public class TransactionValidatingProcessor extends ValidatingItemProcessor<Customer> { public TransactionValidatingProcessor(final int limit) { super( item -> { if (item.getTransactions() >= limit) { throw new ValidationException("Customer has less than " + limit + " transactions"); } } ); setFilter(true); } }

Cette paire de processeurs est ensuite encapsulée dans un CompositeItemProcessor qui implémente le modèle délégué :

 @StepScope @Bean public ItemProcessor<Customer, Customer> processor() { final CompositeItemProcessor<Customer, Customer> processor = new CompositeItemProcessor<>(); processor.setDelegates(Arrays.asList(new BirthdayFilterProcessor(), new TransactionValidatingProcessor(5))); return processor; }

Écrivains personnalisés

Pour sortir les données, Spring Batch fournit l'interface org.springframework.batch.item.ItemWriter pour sérialiser les objets si nécessaire :

 public interface ItemWriter<T> { void write(List<? extends T> items) throws Exception; }

La méthode write() est chargée de s'assurer que tous les tampons internes sont vidés. Si une transaction est active, il sera également généralement nécessaire de supprimer la sortie lors d'une annulation ultérieure. La ressource à laquelle le rédacteur envoie des données devrait normalement être capable de gérer cela elle-même. Il existe des implémentations standard telles que CompositeItemWriter , JdbcBatchItemWriter , JmsItemWriter , JpaItemWriter , SimpleMailMessageItemWriter et autres.

Dans notre exemple d'application, la liste des clients filtrés s'écrit comme suit :

 public class CustomerItemWriter implements ItemWriter<Customer>, Closeable { private final PrintWriter writer; public CustomerItemWriter() { OutputStream out; try { out = new FileOutputStream("output.txt"); } catch (FileNotFoundException e) { out = System.out; } this.writer = new PrintWriter(out); } @Override public void write(final List<? extends Customer> items) throws Exception { for (Customer item : items) { writer.println(item.toString()); } } @PreDestroy @Override public void close() throws IOException { writer.close(); } }

Planification des travaux par lots de printemps

Par défaut, Spring Batch exécute tous les travaux qu'il peut trouver (c'est-à-dire qui sont configurés comme dans CustomerReportJobConfig ) au démarrage. Pour modifier ce comportement, désactivez l'exécution du travail au démarrage en ajoutant la propriété suivante à application.properties :

 spring.batch.job.enabled=false

La planification réelle est ensuite réalisée en ajoutant l'annotation @EnableScheduling à une classe de configuration et l'annotation @Scheduled à la méthode qui exécute le travail lui-même. La planification peut être configurée avec des délais, des taux ou des expressions cron :

 // run every 5000 msec (ie, every 5 secs) @Scheduled(fixedRate = 5000) public void run() throws Exception { JobExecution execution = jobLauncher.run( customerReportJob(), new JobParametersBuilder().toJobParameters() ); }

Il y a cependant un problème avec l'exemple ci-dessus. Lors de l'exécution, le travail ne réussira que la première fois. Lors de son deuxième lancement (c'est-à-dire après cinq secondes), il générera les messages suivants dans les journaux (notez que dans les versions précédentes de Spring Batch, une JobInstanceAlreadyCompleteException aurait été levée):

 INFO 36988 --- [pool-2-thread-1] osbclsupport.SimpleJobLauncher : Job: [SimpleJob: [name=customerReportJob]] launched with the following parameters: [{}] INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=1, version=3, name=taskletStep, status=COMPLETED, exitStatus=COMPLETED, readCount=0, filterCount=0, writeCount=0 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription= INFO 36988 --- [pool-2-thread-1] osbatch.core.job.SimpleStepHandler : Step already complete or not restartable, so no action to execute: StepExecution: id=2, version=53, name=chunkStep, status=COMPLETED, exitStatus=COMPLETED, readCount=1000, filterCount=982, writeCount=18 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=51, rollbackCount=0, exitDescription=

Cela se produit car seuls des JobInstance uniques peuvent être créés et exécutés et Spring Batch n'a aucun moyen de faire la distinction entre le premier et le deuxième JobInstance .

Il existe deux façons d'éviter ce problème lorsque vous planifiez un traitement par lots.

L'une consiste à s'assurer d'introduire un ou plusieurs paramètres uniques (par exemple, l'heure de début réelle en nanosecondes) pour chaque tâche :

 @Scheduled(fixedRate = 5000) public void run() throws Exception { jobLauncher.run( customerReportJob(), new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters() ); }

Alternativement, vous pouvez lancer le travail suivant dans une séquence de JobInstance s déterminée par le JobParametersIncrementer attaché au travail spécifié avec SimpleJobOperator.startNextInstance() :

 @Autowired private JobOperator operator; @Autowired private JobExplorer jobs; @Scheduled(fixedRate = 5000) public void run() throws Exception { List<JobInstance> lastInstances = jobs.getJobInstances(JOB_NAME, 0, 1); if (lastInstances.isEmpty()) { jobLauncher.run(customerReportJob(), new JobParameters()); } else { operator.startNextInstance(JOB_NAME); } }

Tests unitaires par lots de printemps

Habituellement, pour exécuter des tests unitaires dans une application Spring Boot, le framework doit charger un ApplicationContext correspondant. Deux annotations sont utilisées à cet effet :

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {...})

Il existe une classe utilitaire org.springframework.batch.test.JobLauncherTestUtils pour tester les travaux par lots. Il fournit des méthodes pour lancer un travail entier et permet de tester de bout en bout des étapes individuelles sans avoir à exécuter chaque étape du travail. Il doit être déclaré comme haricot Spring :

 @Configuration public class BatchTestConfiguration { @Bean public JobLauncherTestUtils jobLauncherTestUtils() { return new JobLauncherTestUtils(); } }

Un test typique pour un travail et une étape se présente comme suit (et peut également utiliser n'importe quel framework factice) :

 @RunWith(SpringRunner.class) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class CustomerReportJobConfigTest { @Autowired private JobLauncherTestUtils testUtils; @Autowired private CustomerReportJobConfig config; @Test public void testEntireJob() throws Exception { final JobExecution result = testUtils.getJobLauncher().run(config.customerReportJob(), testUtils.getUniqueJobParameters()); Assert.assertNotNull(result); Assert.assertEquals(BatchStatus.COMPLETED, result.getStatus()); } @Test public void testSpecificStep() { Assert.assertEquals(BatchStatus.COMPLETED, testUtils.launchStep("taskletStep").getStatus()); } }

Spring Batch introduit des étendues supplémentaires pour les contextes d'étape et de travail. Les objets de ces portées utilisent le conteneur Spring comme fabrique d'objets, il n'y a donc qu'une seule instance de chacun de ces bean par étape d'exécution ou tâche. De plus, une prise en charge est fournie pour la liaison tardive des références accessibles à partir de StepContext ou JobContext . Les composants qui sont configurés au moment de l'exécution pour être à l'échelle d'une étape ou d'une tâche sont difficiles à tester en tant que composants autonomes, sauf si vous avez un moyen de définir le contexte comme s'ils étaient dans une étape ou une exécution de tâche. C'est l'objectif des composants org.springframework.batch.test.StepScopeTestExecutionListener et org.springframework.batch.test.StepScopeTestUtils dans Spring Batch, ainsi que JobScopeTestExecutionListener et JobScopeTestUtils .

Les TestExecutionListeners sont déclarés au niveau de la classe et leur travail consiste à créer un contexte d'exécution d'étape pour chaque méthode de test. Par exemple:

 @RunWith(SpringRunner.class) @TestExecutionListeners({DependencyInjectionTestExecutionListener.class, StepScopeTestExecutionListener.class}) @ContextConfiguration(classes = {BatchApplication.class, BatchTestConfiguration.class}) public class BirthdayFilterProcessorTest { @Autowired private BirthdayFilterProcessor processor; public StepExecution getStepExecution() { return MetaDataInstanceFactory.createStepExecution(); } @Test public void filter() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); Assert.assertNotNull(processor.process(customer)); } }

Il existe deux TestExecutionListener s. L'un provient du framework Spring Test standard et gère l'injection de dépendances à partir du contexte d'application configuré. L'autre est le Spring Batch StepScopeTestExecutionListener qui configure le contexte de portée d'étape pour l'injection de dépendances dans les tests unitaires. Un StepContext est créé pour la durée d'une méthode de test et mis à la disposition de toutes les dépendances qui sont injectées. Le comportement par défaut consiste simplement à créer une StepExecution avec des propriétés fixes. Alternativement, le StepContext peut être fourni par le scénario de test en tant que méthode d'usine renvoyant le type correct.

Une autre approche est basée sur la classe utilitaire StepScopeTestUtils . Cette classe est utilisée pour créer et manipuler StepScope dans les tests unitaires de manière plus flexible sans utiliser l'injection de dépendances. Par exemple, la lecture de l'identifiant du client filtré par le processeur ci-dessus pourrait se faire comme suit :

 @Test public void filterId() throws Exception { final Customer customer = new Customer(); customer.setId(1); customer.setName("name"); customer.setBirthday(new GregorianCalendar()); final int id = StepScopeTestUtils.doInStepScope( getStepExecution(), () -> processor.process(customer).getId() ); Assert.assertEquals(1, id); }

Prêt pour le lot de printemps avancé ?

Cet article présente certaines des bases de la conception et du développement d'applications Spring Batch. Cependant, il existe de nombreux sujets et fonctionnalités plus avancés, tels que la mise à l'échelle, le traitement parallèle, les écouteurs, etc., qui ne sont pas abordés dans cet article. Espérons que cet article fournit une base utile pour commencer.

Des informations sur ces sujets plus avancés peuvent ensuite être trouvées dans la documentation officielle de Spring Back pour Spring Batch.