Come semplificare la concorrenza con la modellazione reattiva su Android

Pubblicato: 2022-03-11

La concorrenza e l'asincronicità sono inerenti alla programmazione mobile.

Gestire la concorrenza attraverso la programmazione in stile imperativo, che è ciò che generalmente comporta la programmazione su Android, può essere la causa di molti problemi. Utilizzando la programmazione reattiva con RxJava, puoi evitare potenziali problemi di concorrenza fornendo una soluzione più pulita e meno soggetta a errori.

Oltre a semplificare le attività simultanee e asincrone, RxJava offre anche la possibilità di eseguire operazioni di stile funzionale che trasformano, combinano e aggregano le emissioni di un osservabile fino a raggiungere il risultato desiderato.

Combinando il paradigma reattivo di RxJava e le operazioni di stile funzionale, possiamo modellare un'ampia gamma di costrutti di concorrenza in modo reattivo, anche nel mondo non reattivo di Android. In questo articolo imparerai come puoi fare esattamente questo. Imparerai anche come adottare RxJava in un progetto esistente in modo incrementale.

Se non conosci RxJava, ti consiglio di leggere il post qui che parla di alcuni dei fondamenti di RxJava.

Colmare i non reattivi nel mondo reattivo

Una delle sfide dell'aggiunta di RxJava come una delle librerie al tuo progetto è che cambia radicalmente il modo in cui ragioni sul tuo codice.

RxJava richiede di pensare ai dati come a un push piuttosto che a un pull. Sebbene il concetto stesso sia semplice, cambiare una base di codice completa basata su un paradigma pull può essere un po' scoraggiante. Sebbene la coerenza sia sempre l'ideale, potresti non avere sempre il privilegio di effettuare questa transizione nell'intera base di codice tutto in una volta, quindi potrebbe essere necessario un approccio più incrementale.

Considera il seguente codice:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

Questa funzione ottiene un elenco di oggetti User dalla cache, li filtra ciascuno in base al fatto che l'utente abbia o meno un blog, li ordina in base al nome dell'utente e infine li restituisce al chiamante. Osservando questo frammento, notiamo che molte di queste operazioni possono trarre vantaggio dagli operatori RxJava; ad esempio, filter() e sorted() .

La riscrittura di questo frammento di codice ci dà quindi:

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

La prima riga della funzione converte List<User> restituito da UserCache.getAllUsers() in un Observable<User> tramite fromIterable() . Questo è il primo passo per rendere reattivo il nostro codice. Ora che stiamo operando su un Observable , questo ci consente di eseguire qualsiasi operatore Observable nel toolkit RxJava – filter() e sorted() in questo caso.

Ci sono alcuni altri punti da notare su questo cambiamento.

Innanzitutto, la firma del metodo non è più la stessa. Questo potrebbe non essere un grosso problema se questa chiamata al metodo viene utilizzata solo in pochi punti ed è facile propagare le modifiche ad altre aree dello stack; tuttavia, se interrompe i client che fanno affidamento su questo metodo, ciò è problematico e la firma del metodo dovrebbe essere ripristinata.

In secondo luogo, RxJava è progettato pensando alla pigrizia. Cioè, non devono essere eseguite operazioni lunghe quando non ci sono abbonati Observable . Con questa modifica, tale presupposto non è più vero poiché UserCache.getAllUsers() viene invocato anche prima che ci siano abbonati.

Lasciare il mondo reattivo

Per risolvere il primo problema della nostra modifica, possiamo utilizzare uno qualsiasi degli operatori di blocco disponibili per un Observable come blockingFirst() e blockingNext() . In sostanza, entrambi questi operatori si bloccheranno finché non viene emesso un elemento a valle: blockingFirst() restituirà il primo elemento emesso e finirà, mentre blockingNext() restituirà un Iterable che consente di eseguire un ciclo for-ogni sui dati sottostanti ( ogni iterazione del ciclo si bloccherà).

Un effetto collaterale dell'utilizzo di un'operazione di blocco di cui è importante essere consapevoli, tuttavia, è che le eccezioni vengono generate sul thread chiamante anziché essere passate al metodo onError() di un osservatore.

Usando un operatore di blocco per riportare la firma del metodo in un List<User> , il nostro frammento di codice sarebbe ora simile a questo:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

Prima di chiamare un operatore di blocco (cioè blockingGet() ) dobbiamo prima concatenare l'operatore aggregato toList() in modo che il flusso venga modificato da Observable<User> a Single<List<User>> (un Single è un tipo speciale di Observable che emette un solo valore in onSuccess() o un errore tramite onError() ).

Successivamente, possiamo chiamare l'operatore di blockingGet() che scarta il Single e restituisce un List<User> .

Sebbene RxJava lo supporti, per quanto possibile questo dovrebbe essere evitato poiché non si tratta di una programmazione reattiva idiomatica. Quando assolutamente necessario, però, bloccare gli operatori è un buon modo iniziale per uscire dal mondo reattivo.

L'approccio pigro

Come accennato in precedenza, RxJava è stato progettato pensando alla pigrizia. Cioè, le operazioni di lunga durata dovrebbero essere ritardate il più a lungo possibile (cioè fino a quando non viene invocata una sottoscrizione su un Observable ). Per rendere la nostra soluzione pigra, utilizziamo l'operatore defer() .

defer() accetta una fabbrica ObservableSource che crea un Observable per ogni nuovo osservatore che si iscrive. Nel nostro caso, vogliamo restituire Observable.fromIterable(UserCache.getAllUser()) ogni volta che un osservatore si iscrive.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Ora che l'operazione di lunga durata è racchiusa in un defer() , abbiamo il pieno controllo su quale thread dovrebbe essere eseguito semplicemente specificando lo Scheduler appropriato in subscribeOn() . Con questa modifica, il nostro codice è completamente reattivo e la sottoscrizione dovrebbe avvenire solo nel momento in cui i dati sono necessari.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

Un altro operatore abbastanza utile per differire il calcolo è il metodo fromCallable() . A differenza di defer() , che si aspetta che un Observable venga restituito nella funzione lambda e, a sua volta, "appiattisce" l' Observable restituito, fromCallable() invocherà lambda e restituirà il valore a valle.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

L'uso singolo di fromCallable() su un elenco ora restituirebbe un Observable<List<User>> , è necessario appiattire questo elenco usando flatMap() .

Reattivo-tutto

Dagli esempi precedenti, abbiamo visto che possiamo avvolgere qualsiasi oggetto in un Observable e saltare tra stati non reattivi e reattivi usando operazioni di blocco e defer( defer() / fromCallable() . Usando questi costrutti, possiamo iniziare a convertire le aree di un'app Android in modo che siano reattive.

Operazioni lunghe

Un buon punto di partenza per pensare all'utilizzo di RxJava è ogni volta che si ha un processo che richiede del tempo per essere eseguito, come chiamate di rete (consultare il post precedente per esempi), letture e scritture del disco, ecc. L'esempio seguente illustra una semplice funzione che scriverà il testo nel file system:

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

Quando chiamiamo questa funzione, dobbiamo assicurarci che venga eseguita su un thread separato poiché questa operazione sta bloccando. Imporre una tale restrizione al chiamante complica le cose per lo sviluppatore, aumentando la probabilità di bug e potenzialmente rallentando lo sviluppo.

L'aggiunta di un commento alla funzione aiuterà ovviamente a evitare errori da parte del chiamante, ma è ancora tutt'altro che a prova di proiettile.

Usando RxJava, tuttavia, possiamo facilmente racchiuderlo in un Observable e specificare lo Scheduler su cui dovrebbe essere eseguito. In questo modo, il chiamante non deve preoccuparsi affatto di invocare la funzione in un thread separato; la funzione se ne occuperà da sola.

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

Utilizzando fromCallable() , la scrittura del testo su file viene posticipata fino al momento della sottoscrizione.

Poiché le eccezioni sono oggetti di prima classe in RxJava, un altro vantaggio della nostra modifica è che non abbiamo più bisogno di racchiudere l'operazione in un blocco try/catch. L'eccezione verrà semplicemente propagata a valle anziché essere inghiottita. Ciò consente al chiamante di gestire l'eccezione che ritiene opportuno (ad es. mostrare un errore all'utente a seconda dell'eccezione generata, ecc.).

Un'altra ottimizzazione che possiamo eseguire è restituire un Completable piuttosto che un Observable . Un Completable è essenzialmente un tipo speciale di Observable , simile a un Single , che indica semplicemente se un calcolo è riuscito, tramite onComplete() , o non è riuscito, tramite onError() . La restituzione di un Completable sembra avere più senso in questo caso poiché sembra sciocco restituire un singolo true in un flusso Observable .

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

Per completare l'operazione, utilizziamo l'operazione fromAction() di un Completable poiché il valore restituito non ci interessa più. Se necessario, come un Observable , un Completable supporta anche le fromCallable() e defer( defer() .

Sostituzione delle richiamate

Finora, tutti gli esempi che abbiamo esaminato emettono un valore (ad esempio, possono essere modellati come Single ) o ci dicono se un'operazione è riuscita o non è riuscita (ad esempio, può essere modellata come Completable ).

Tuttavia, come potremmo convertire le aree nella nostra app che ricevono aggiornamenti o eventi continui (come aggiornamenti sulla posizione, eventi di clic, eventi di sensori e così via)?

Vedremo due modi per farlo, usando create() e usando Subjects .

create() ci permette di invocare esplicitamente onNext() di un osservatore | onComplete() | onError() quando riceviamo aggiornamenti dalla nostra origine dati. Per utilizzare create() , passiamo un ObservableOnSubscribe che riceve un ObservableEmitter ogni volta che un osservatore si iscrive. Utilizzando l'emettitore ricevuto, possiamo quindi eseguire tutte le chiamate di configurazione necessarie per iniziare a ricevere gli aggiornamenti e quindi invocare l'evento Emitter appropriato.

Nel caso di aggiornamenti sulla posizione, possiamo registrarci per ricevere gli aggiornamenti in questo luogo ed emettere gli aggiornamenti sulla posizione così come ricevuti.

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

La funzione all'interno della chiamata create() richiede aggiornamenti della posizione e passa un callback che viene richiamato quando la posizione del dispositivo cambia. Come possiamo vedere qui, sostituiamo essenzialmente l'interfaccia in stile callback e invece emettiamo la posizione ricevuta nel flusso Observable creato (per motivi didattici, ho saltato alcuni dettagli con la costruzione di una richiesta di posizione, se vuoi approfondire più in profondità nei dettagli puoi leggerlo qui).

Un'altra cosa da notare su create() è che, ogni volta che viene chiamato subscribe() , viene fornito un nuovo emettitore. In altre parole, create() restituisce un Observable freddo. Ciò significa che, nella funzione sopra, potremmo potenzialmente richiedere aggiornamenti della posizione più volte, il che non è quello che vogliamo.

Per ovviare a questo, vogliamo cambiare la funzione per restituire un Observable caldo con l'aiuto di Subjects .

Inserisci Soggetti

Un Subject estende un Observable e implementa l' Observer allo stesso tempo. Ciò è particolarmente utile ogni volta che vogliamo emettere o trasmettere lo stesso evento a più abbonati contemporaneamente. Per quanto riguarda l'implementazione, vorremmo esporre il Subject come Observable ai clienti, mantenendolo come Subject per il provider.

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

In questa nuova implementazione viene utilizzato il sottotipo PublishSubject che emette gli eventi man mano che arrivano a partire dal momento della sottoscrizione. Di conseguenza, se un abbonamento viene eseguito in un punto in cui sono già stati emessi aggiornamenti di posizione, le emissioni passate non saranno ricevute dall'osservatore, ma solo quelle successive. Se questo comportamento non è desiderato, è possibile utilizzare un paio di altri sottotipi di Subject nel toolkit di RxJava.

Inoltre, abbiamo anche creato una funzione connect() separata che avvia la richiesta di ricevere gli aggiornamenti sulla posizione. observeLocation() può ancora eseguire la chiamata connect() , ma l'abbiamo rifattorizzato fuori dalla funzione per chiarezza/semplicità.

Sommario

Abbiamo esaminato una serie di meccanismi e tecniche:

  • defer() e le sue varianti per ritardare l'esecuzione di un calcolo fino alla sottoscrizione
  • Cold Observables generati tramite create()
  • Observables caldi usando i Subjects
  • blockingX operazioni quando vogliamo lasciare il mondo reattivo

Si spera che gli esempi forniti in questo articolo abbiano ispirato alcune idee relative a diverse aree dell'app che possono essere convertite per essere reattive. Abbiamo trattato molto e se hai domande, suggerimenti o se qualcosa non è chiaro, sentiti libero di lasciare un commento qui sotto!

Se sei interessato a saperne di più su RxJava, sto lavorando a un libro approfondito che spiega come visualizzare i problemi in modo reattivo utilizzando esempi Android. Se desideri ricevere aggiornamenti su di esso, iscriviti qui.