Comment simplifier la concurrence avec la modélisation réactive sur Android
Publié: 2022-03-11La simultanéité et l'asynchronicité sont inhérentes à la programmation mobile.
La gestion de la concurrence par le biais d'une programmation de style impératif, ce que la programmation sur Android implique généralement, peut être la cause de nombreux problèmes. En utilisant la programmation réactive avec RxJava, vous pouvez éviter les problèmes potentiels de concurrence en fournissant une solution plus propre et moins sujette aux erreurs.
En plus de simplifier les tâches simultanées et asynchrones, RxJava offre également la possibilité d'effectuer des opérations de style fonctionnel qui transforment, combinent et agrégent les émissions d'un Observable jusqu'à ce que nous obtenions le résultat souhaité.
En combinant le paradigme réactif de RxJava et les opérations de style fonctionnel, nous pouvons modéliser un large éventail de constructions de concurrence de manière réactive, même dans le monde non réactif d'Android. Dans cet article, vous apprendrez comment vous pouvez faire exactement cela. Vous apprendrez également à adopter progressivement RxJava dans un projet existant.
Si vous êtes nouveau sur RxJava, je vous recommande de lire le post ici qui parle de certains des principes fondamentaux de RxJava.
Faire le pont entre le non réactif et le monde réactif
L'un des défis de l'ajout de RxJava comme l'une des bibliothèques de votre projet est que cela change fondamentalement la façon dont vous raisonnez sur votre code.
RxJava vous oblige à penser que les données sont poussées plutôt que tirées. Bien que le concept lui-même soit simple, modifier une base de code complète basée sur un paradigme pull peut être un peu intimidant. Bien que la cohérence soit toujours idéale, vous n'aurez peut-être pas toujours le privilège d'effectuer cette transition dans l'ensemble de votre base de code en une seule fois, donc une approche plus incrémentielle peut être nécessaire.
Considérez le code suivant :
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
Cette fonction obtient une liste d'objets User
du cache, filtre chacun d'eux en fonction du fait que l'utilisateur possède ou non un blog, les trie par nom d'utilisateur et les renvoie finalement à l'appelant. En regardant cet extrait, nous remarquons que bon nombre de ces opérations peuvent tirer parti des opérateurs RxJava ; par exemple, filter()
et sorted()
.
La réécriture de cet extrait nous donne alors :
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
La première ligne de la fonction convertit le List<User>
renvoyé par UserCache.getAllUsers()
en un Observable<User>
via fromIterable()
. C'est la première étape pour rendre notre code réactif. Maintenant que nous opérons sur un Observable
, cela nous permet d'effectuer n'importe quel opérateur Observable
dans la boîte à outils RxJava - filter()
et sorted()
dans ce cas.
Il y a quelques autres points à noter à propos de ce changement.
Premièrement, la signature de la méthode n'est plus la même. Ce n'est peut-être pas un gros problème si cet appel de méthode n'est utilisé qu'à quelques endroits et qu'il est facile de propager les modifications à d'autres zones de la pile ; cependant, si cela casse les clients qui s'appuient sur cette méthode, cela pose problème et la signature de la méthode doit être annulée.
Deuxièmement, RxJava est conçu avec la paresse à l'esprit. Autrement dit, aucune opération longue ne doit être effectuée lorsqu'il n'y a pas d'abonnés à Observable
. Avec cette modification, cette hypothèse n'est plus vraie puisque UserCache.getAllUsers()
est invoqué avant même qu'il n'y ait des abonnés.
Quitter le monde réactif
Pour résoudre le premier problème de notre modification, nous pouvons utiliser l'un des opérateurs de blocage disponibles pour un Observable
, tels que blockingFirst()
et blockingNext()
. Essentiellement, ces deux opérateurs bloqueront jusqu'à ce qu'un élément soit émis en aval : blockingFirst()
renverra le premier élément émis et terminera, tandis que blockingNext()
renverra un Iterable
qui vous permet d'effectuer une boucle for-each sur les données sous-jacentes ( chaque itération dans la boucle bloquera).
Un effet secondaire de l'utilisation d'une opération de blocage qu'il est important de connaître, cependant, est que les exceptions sont levées sur le thread appelant plutôt que d'être transmises à la méthode onError()
d'un observateur.
En utilisant un opérateur bloquant pour changer la signature de la méthode en List<User>
, notre extrait ressemblerait maintenant à ceci :
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
Avant d'appeler un opérateur de blocage (c'est-à-dire blockingGet()
), nous devons d'abord enchaîner l'opérateur d'agrégat toList()
afin que le flux soit modifié d'un Observable<User>
à un Single<List<User>>
(un Single
est un type spécial d' Observable
qui n'émet qu'une seule valeur dans onSuccess()
, ou une erreur via onError()
).
Ensuite, nous pouvons appeler l'opérateur de blockingGet()
qui déballe le Single
et renvoie un List<User>
.
Bien que RxJava le supporte, cela doit être évité autant que possible car il ne s'agit pas d'une programmation réactive idiomatique. Cependant, lorsqu'ils sont absolument nécessaires, les opérateurs de blocage sont une bonne première façon de sortir du monde réactif.
L'approche paresseuse
Comme mentionné précédemment, RxJava a été conçu avec la paresse à l'esprit. Autrement dit, les opérations de longue durée doivent être retardées aussi longtemps que possible (c'est-à-dire jusqu'à ce qu'un abonnement soit appelé sur un Observable
). Pour rendre notre solution paresseuse, nous utilisons l'opérateur defer()
.
defer()
prend dans une usine ObservableSource
qui crée un Observable
pour chaque nouvel observateur qui s'abonne. Dans notre cas, nous voulons retourner Observable.fromIterable(UserCache.getAllUser())
chaque fois qu'un observateur s'abonne.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Maintenant que l'opération de longue durée est enveloppée dans un defer()
, nous avons un contrôle total sur le thread dans lequel elle doit s'exécuter simplement en spécifiant le Scheduler
approprié dans subscribeOn()
. Avec ce changement, notre code est entièrement réactif et l'abonnement ne devrait avoir lieu qu'au moment où les données sont nécessaires.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
Un autre opérateur très utile pour différer le calcul est la méthode fromCallable()
. Contrairement à defer()
, qui s'attend à ce qu'un Observable
soit renvoyé dans la fonction lambda et à son tour « aplatit » l' Observable
renvoyé, fromCallable()
invoquera le lambda et renverra la valeur en aval.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
L'utilisation unique de fromCallable()
sur une liste renverrait désormais un Observable<List<User>>
, nous devons aplatir cette liste en utilisant flatMap()
.
Tout réactif
Dans les exemples précédents, nous avons vu que nous pouvons envelopper n'importe quel objet dans un Observable
et passer d'un état non réactif à un état réactif en utilisant des opérations de blocage et defer( defer()
/ fromCallable()
. En utilisant ces constructions, nous pouvons commencer à convertir des zones d'une application Android pour qu'elles soient réactives.
Longues opérations
Un bon endroit pour penser initialement à utiliser RxJava est chaque fois que vous avez un processus qui prend un certain temps à exécuter, comme les appels réseau (consultez le post précédent pour des exemples), les lectures et écritures sur disque, etc. L'exemple suivant illustre une fonction simple qui écrira du texte dans le système de fichiers :

/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
Lors de l'appel de cette fonction, nous devons nous assurer que cela se fait sur un thread séparé car cette opération est bloquante. Imposer une telle restriction à l'appelant complique les choses pour le développeur, ce qui augmente la probabilité de bogues et peut potentiellement ralentir le développement.
L'ajout d'un commentaire à la fonction aidera bien sûr à éviter les erreurs de l'appelant, mais c'est encore loin d'être à l'épreuve des balles.
En utilisant RxJava, cependant, nous pouvons facilement encapsuler cela dans un Observable
et spécifier le Scheduler
lequel il doit s'exécuter. De cette façon, l'appelant n'a pas du tout à se préoccuper d'invoquer la fonction dans un thread séparé ; la fonction s'en charge elle-même.
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
En utilisant fromCallable()
, l'écriture du texte dans le fichier est différée jusqu'au moment de l'abonnement.
Étant donné que les exceptions sont des objets de première classe dans RxJava, un autre avantage de notre changement est que nous n'avons plus besoin d'envelopper l'opération dans un bloc try/catch. L'exception sera simplement propagée en aval plutôt que d'être avalée. Cela permet à l'appelant de gérer l'exception comme bon lui semble (par exemple, montrer une erreur à l'utilisateur en fonction de l'exception levée, etc.).
Une autre optimisation que nous pouvons effectuer consiste à renvoyer un Completable
plutôt qu'un Observable
. Un Completable
est essentiellement un type spécial d' Observable
- similaire à un Single
- qui indique simplement si un calcul a réussi, via onComplete()
, ou échoué, via onError()
. Retourner un Completable
semble avoir plus de sens dans ce cas car il semble idiot de retourner un seul vrai dans un flux Observable
.
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
Pour terminer l'opération, on utilise l'opération fromAction()
d'un Completable
puisque la valeur de retour ne nous intéresse plus. Si nécessaire, comme un Observable
, un Completable
prend également en charge les fromCallable()
et defer( defer()
.
Remplacement des rappels
Jusqu'à présent, tous les exemples que nous avons examinés émettent soit une valeur (c'est-à-dire qu'ils peuvent être modélisés comme un Single
), soit nous indiquent si une opération a réussi ou échoué (c'est-à-dire qu'ils peuvent être modélisés comme un Completable
).
Cependant, comment pouvons-nous convertir les zones de notre application qui reçoivent des mises à jour ou des événements continus (tels que des mises à jour de localisation, afficher des événements de clic, des événements de capteur, etc.) ?
Nous examinerons deux façons de procéder, en utilisant create()
et en utilisant Subjects
.
create()
nous permet d'appeler explicitement onNext()
| d'un observateur onComplete()
| onError()
lorsque nous recevons des mises à jour de notre source de données. Pour utiliser create()
, nous transmettons un ObservableOnSubscribe
qui reçoit un ObservableEmitter
chaque fois qu'un observateur s'abonne. À l'aide de l'émetteur reçu, nous pouvons ensuite effectuer tous les appels de configuration nécessaires pour commencer à recevoir des mises à jour, puis invoquer l'événement Emitter
approprié.
Dans le cas de mises à jour de localisation, nous pouvons nous inscrire pour recevoir des mises à jour à cet endroit et émettre des mises à jour de localisation telles que reçues.
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
La fonction à l'intérieur de l'appel create()
demande des mises à jour d'emplacement et transmet un rappel qui est invoqué lorsque l'emplacement de l'appareil change. Comme nous pouvons le voir ici, nous remplaçons essentiellement l'interface de style rappel et émettons à la place l'emplacement reçu dans le flux Observable créé (à des fins éducatives, j'ai ignoré certains détails lors de la construction d'une demande d'emplacement, si vous voulez approfondir plus profondément dans les détails, vous pouvez le lire ici).
Une autre chose à noter à propos de create()
est que, chaque fois que subscribe()
est appelé, un nouvel émetteur est fourni. En d'autres termes, create()
renvoie un Observable
froid. Cela signifie que, dans la fonction ci-dessus, nous demanderions potentiellement des mises à jour de localisation plusieurs fois, ce qui n'est pas ce que nous voulons.
Pour contourner ce problème, nous souhaitons modifier la fonction pour renvoyer un Observable
chaud à l'aide de Subjects
.
Entrez les sujets
Un Subject
étend un Observable
et implémente un Observer
en même temps. Ceci est particulièrement utile chaque fois que nous voulons émettre ou diffuser le même événement à plusieurs abonnés en même temps. Du point de vue de la mise en œuvre, nous voudrions exposer le Subject
en tant Observable
aux clients, tout en le conservant en tant que Subject
pour le fournisseur.
public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }
Dans cette nouvelle implémentation, le sous-type PublishSubject
est utilisé qui émet des événements au fur et à mesure qu'ils arrivent à partir du moment de la souscription. En conséquence, si un abonnement est effectué à un moment où des mises à jour de localisation ont déjà été émises, les émissions passées ne seront pas reçues par l'observateur, seulement les suivantes. Si ce comportement n'est pas souhaité, il existe quelques autres sous-types de Subject
dans la boîte à outils RxJava qui peuvent être utilisés.
De plus, nous avons également créé une fonction connect()
distincte qui lance la demande de réception des mises à jour de localisation. L' observeLocation()
peut toujours effectuer l'appel connect()
, mais nous l'avons refactorisé hors de la fonction pour plus de clarté/simplicité.
Sommaire
Nous avons examiné un certain nombre de mécanismes et de techniques :
-
defer()
et ses variantes pour retarder l'exécution d'un calcul jusqu'à la souscription -
Observables
froids générés viacreate()
-
Observables
chauds utilisant desSubjects
- blocage des opérations X quand on veut quitter le monde réactif
J'espère que les exemples fournis dans cet article ont inspiré quelques idées concernant différentes zones de votre application qui peuvent être converties pour être réactives. Nous avons couvert beaucoup de choses et si vous avez des questions, des suggestions ou si quelque chose n'est pas clair, n'hésitez pas à laisser un commentaire ci-dessous !
Si vous souhaitez en savoir plus sur RxJava, je travaille sur un livre approfondi qui explique comment visualiser les problèmes de manière réactive à l'aide d'exemples Android. Si vous souhaitez recevoir des mises à jour à ce sujet, veuillez vous inscrire ici.