Cum să simplificați concurența cu modelarea reactivă pe Android
Publicat: 2022-03-11Concurența și asincronismul sunt inerente programării mobile.
Confruntarea cu concurența prin programarea în stil imperativ, care este ceea ce implică în general programarea pe Android, poate fi cauza multor probleme. Folosind programarea reactivă cu RxJava, puteți evita potențialele probleme de concurență, oferind o soluție mai curată și mai puțin predispusă la erori.
Pe lângă simplificarea sarcinilor concurente, asincrone, RxJava oferă, de asemenea, capacitatea de a efectua operațiuni de stil funcțional care transformă, combină și agrega emisiile de la un Observable până când atingem rezultatul dorit.
Combinând paradigma reactivă RxJava și operațiunile de stil funcțional, putem modela o gamă largă de constructe concurente într-un mod reactiv, chiar și în lumea nereactivă a Android. În acest articol, veți afla cum puteți face exact asta. Veți învăța, de asemenea, cum să adoptați treptat RxJava într-un proiect existent.
Dacă sunteți nou în RxJava, vă recomand să citiți postarea de aici, care vorbește despre unele dintre elementele fundamentale ale RxJava.
Conectarea non-reactivilor în lumea reactivă
Una dintre provocările adăugării RxJava ca una dintre bibliotecile la proiectul tău este că schimbă fundamental modul în care raționezi codul.
RxJava vă cere să vă gândiți la date ca fiind împinse, mai degrabă decât extrase. În timp ce conceptul în sine este simplu, schimbarea unei baze de cod complete care se bazează pe o paradigmă pull poate fi puțin descurajantă. Deși consecvența este întotdeauna ideală, s-ar putea să nu aveți întotdeauna privilegiul de a face această tranziție în întreaga bază de cod dintr-o dată, așa că ar putea fi necesară o abordare incrementală.
Luați în considerare următorul cod:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
Această funcție primește o listă de obiecte User
din cache, le filtrează pe fiecare în funcție de dacă utilizatorul are sau nu un blog, le sortează după numele utilizatorului și, în final, le returnează apelantului. Privind acest fragment, observăm că multe dintre aceste operațiuni pot profita de operatorii RxJava; de exemplu, filter()
și sorted()
.
Rescrierea acestui fragment ne dă apoi:
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Prima linie a funcției convertește List<User>
returnat de UserCache.getAllUsers()
într-un Observable<User>
prin fromIterable()
. Acesta este primul pas pentru a face codul nostru reactiv. Acum că operam pe un Observable
, acest lucru ne permite să efectuăm orice operator Observable
din setul de instrumente RxJava – filter()
și sorted()
în acest caz.
Există și alte câteva puncte de remarcat despre această schimbare.
În primul rând, semnătura metodei nu mai este aceeași. Acest lucru poate să nu fie o afacere uriașă dacă acest apel de metodă este folosit doar în câteva locuri și este ușor să propagați modificările în alte zone ale stivei; cu toate acestea, dacă întrerupe clienții care se bazează pe această metodă, acest lucru este problematic și semnătura metodei ar trebui să fie inversată.
În al doilea rând, RxJava este proiectat având în vedere lenea. Adică, nu trebuie efectuate operațiuni lungi când nu există abonați la Observable
. Cu această modificare, acea ipoteză nu mai este adevărată, deoarece UserCache.getAllUsers()
este invocat chiar înainte de a exista abonați.
Părăsind lumea reactivă
Pentru a rezolva prima problemă din modificarea noastră, putem folosi oricare dintre operatorii de blocare disponibili pentru un Observable
, cum ar fi blockingFirst()
și blockingNext()
. În esență, ambii acești operatori se vor bloca până când un articol este emis în aval: blockingFirst()
va returna primul element emis și va termina, în timp ce blockingNext()
va returna un Iterable
care vă permite să efectuați o buclă for-each pe datele de bază ( fiecare iterație prin buclă se va bloca).
Un efect secundar al utilizării unei operații de blocare de care este important să fii conștient, totuși, este că excepțiile sunt aruncate pe firul de execuție apelant, mai degrabă decât să fie transmise metodei onError()
a unui observator.
Folosind un operator de blocare pentru a schimba semnătura metodei înapoi la List<User>
, fragmentul nostru ar arăta acum astfel:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
Înainte de a apela un operator de blocare (adică blockingGet()
) trebuie mai întâi să înlănțuim operatorul agregat toList()
astfel încât fluxul să fie modificat de la un Observable<User>
la un Single<List<User>>
(un Single
este un tip special de Observable
care emite doar o singură valoare în onSuccess()
sau o eroare prin onError()
).
Ulterior, putem apela operatorul de blockingGet()
care dezactivează Single
-ul și returnează un List<User>
.
Deși RxJava acceptă acest lucru, pe cât posibil acest lucru ar trebui evitat deoarece aceasta nu este o programare reactivă idiomatică. Când este absolut necesar însă, blocarea operatorilor este o modalitate inițială bună de a ieși din lumea reactivă.
Abordarea leneșă
După cum am menționat mai devreme, RxJava a fost proiectat având în vedere lenea. Adică, operațiunile de lungă durată ar trebui amânate cât mai mult posibil (adică până când este invocat un abonament pe un Observable
). Pentru a face soluția noastră leneșă, folosim operatorul defer()
.
defer()
preia o fabrică ObservableSource
care creează un Observable
pentru fiecare observator nou care se abonează. În cazul nostru, dorim să returnăm Observable.fromIterable(UserCache.getAllUser())
ori de câte ori un observator se abonează.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Acum că operațiunea de lungă durată este încapsulată într-un defer()
, avem control deplin asupra firului de execuție în care ar trebui să ruleze, pur și simplu specificând Scheduler
-ul corespunzător în subscribeOn()
. Odată cu această modificare, codul nostru este complet reactiv și abonamentul ar trebui să aibă loc numai în momentul în care datele sunt necesare.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
Un alt operator destul de util pentru amânarea calculului este metoda fromCallable()
. Spre deosebire de defer()
, care se așteaptă ca un Observable
să fie returnat în funcția lambda și, la rândul său, „aplatizează” Observable
returnat, fromCallable()
va invoca lambda și va returna valoarea în aval.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Folosirea fromCallable()
pe o listă ar returna acum un Observable<List<User>>
, trebuie să aplatăm această listă folosind flatMap()
.
Reactiv-totul
Din exemplele anterioare, am văzut că putem înfășura orice obiect într-un Observable
și să sărim între stările nereactive și reactive folosind operații de blocare și defer( defer()
/ fromCallable()
. Folosind aceste construcții, putem începe să convertim zonele unei aplicații Android pentru a fi reactive.
Operațiuni lungi
Un loc bun pentru a vă gândi inițial la utilizarea RxJava este ori de câte ori aveți un proces care durează ceva timp, cum ar fi apeluri în rețea (consultați postarea anterioară pentru exemple), citirea și scrierea pe disc etc. Următorul exemplu ilustrează o funcție simplă care va scrie text în sistemul de fișiere:

/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
Când apelăm această funcție, trebuie să ne asigurăm că se face pe un fir separat, deoarece această operație se blochează. Impunerea unei astfel de restricții asupra apelantului complică lucrurile pentru dezvoltator, ceea ce crește probabilitatea de erori și poate încetini dezvoltarea.
Adăugarea unui comentariu la funcție va ajuta, desigur, la evitarea erorilor din partea apelantului, dar aceasta este încă departe de a fi antiglonț.
Cu toate acestea, folosind RxJava, putem include cu ușurință acest lucru într-un Observable
și să specificăm Scheduler
-ul pe care ar trebui să ruleze. În acest fel, apelantul nu trebuie să fie deloc preocupat de invocarea funcției într-un fir separat; funcția se va ocupa de aceasta însăși.
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
Folosind fromCallable()
, scrierea textului în fișier este amânată până la momentul abonării.
Deoarece excepțiile sunt obiecte de primă clasă în RxJava, un alt beneficiu al schimbării noastre este că nu mai trebuie să încapsulăm operația într-un bloc try/catch. Excepția va fi pur și simplu propagată în aval, în loc să fie înghițită. Acest lucru permite apelantului să gestioneze excepția pe care o consideră potrivită (de exemplu, arătați utilizatorului o eroare în funcție de excepția lansată etc.).
O altă optimizare pe care o putem realiza este să returnăm un Completable
mai degrabă decât un Observable
. Un Completable
este, în esență, un tip special de Observable
- similar cu un Single
- care indică pur și simplu dacă un calcul a reușit, prin onComplete()
, sau a eșuat, prin onError()
. Returnarea unui Completable
pare să aibă mai mult sens în acest caz, deoarece pare o prostie să returnezi un singur adevărat într-un flux Observable
.
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
Pentru a finaliza operația, folosim fromAction()
a unui Completable
, deoarece valoarea returnată nu ne mai interesează. Dacă este necesar, ca și un Observable
, un Completable
acceptă și fromCallable()
și defer( defer()
.
Înlocuirea apelurilor inverse
Până acum, toate exemplele pe care le-am analizat emit fie o singură valoare (adică, pot fi modelate ca Single
), fie ne spun dacă o operație a reușit sau eșuat (adică, poate fi modelată ca Completable
).
Cum am putea converti zonele din aplicația noastră, totuși, care primesc actualizări sau evenimente continue (cum ar fi actualizări de locație, evenimente de clic de vizualizare, evenimente de senzor și așa mai departe)?
Vom analiza două moduri de a face acest lucru, folosind create()
și folosind Subjects
.
create()
ne permite să invocăm în mod explicit onNext()
| onComplete()
| metoda onError()
pe măsură ce primim actualizări de la sursa noastră de date. Pentru a folosi create()
, transmitem un ObservableOnSubscribe
care primește un ObservableEmitter
ori de câte ori un observator se abonează. Folosind emițătorul primit, putem apoi efectua toate apelurile de configurare necesare pentru a începe să primim actualizări și apoi invocam evenimentul Emitter
corespunzător.
În cazul actualizărilor de locație, ne putem înregistra pentru a primi actualizări în acest loc și pentru a emite actualizări de locație așa cum le-am primit.
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
Funcția din interiorul apelului create()
solicită actualizări de locație și transmite un apel invers care este invocat atunci când locația dispozitivului se schimbă. După cum putem vedea aici, înlocuim, în esență, interfața în stilul callback și emitem, în schimb, locația primită în fluxul Observable creat (de dragul scopurilor educaționale, am omis unele detalii cu construirea unei cereri de locație, dacă doriți să explorați mai profund în detalii îl puteți citi aici).
Un alt lucru de remarcat despre create()
este că, ori de câte ori subscribe()
este apelat, este furnizat un nou emitent. Cu alte cuvinte, create()
returnează un Observable
rece. Aceasta înseamnă că, în funcția de mai sus, am putea solicita actualizări de locație de mai multe ori, ceea ce nu este ceea ce ne dorim.
Pentru a rezolva acest lucru, dorim să schimbăm funcția pentru a returna un Observable
fierbinte cu ajutorul Subjects
.
Introduceți Subiecte
Un Subject
extinde un Observable
și implementează Observer
în același timp. Acest lucru este util în special atunci când dorim să emitem sau să difuzăm același eveniment către mai mulți abonați în același timp. Din punct de vedere al implementării, am dori să expunem Subject
ca un Observable
clienților, păstrându-l în același timp ca Subject
pentru furnizor.
public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }
În această nouă implementare se folosește subtipul PublishSubject
care emite evenimente pe măsură ce sosesc începând din momentul abonării. În consecință, dacă un abonament este efectuat într-un moment în care actualizările de locație au fost deja emise, emisiile trecute nu vor fi primite de către observator, ci doar cele ulterioare. Dacă acest comportament nu este dorit, există alte câteva subtipuri de Subject
în setul de instrumente RxJava care pot fi utilizate.
În plus, am creat și o funcție separată connect()
care pornește solicitarea de a primi actualizări de locație. observeLocation()
încă poate efectua apelul connect()
, dar l-am refactorizat din funcție pentru claritate/simplitate.
rezumat
Am analizat o serie de mecanisme și tehnici:
-
defer()
și variantele sale pentru a întârzia execuția unui calcul până la abonare -
Observables
reci generate princreate()
-
Observables
fierbinte folosindSubjects
- operațiuni blockingX atunci când vrem să părăsim lumea reactivă
Sperăm că exemplele oferite în acest articol au inspirat câteva idei cu privire la diferite zone din aplicația dvs. care pot fi convertite în reactive. Am acoperit multe și dacă aveți întrebări, sugestii sau dacă ceva nu este clar, nu ezitați să lăsați un comentariu mai jos!
Dacă sunteți interesat să aflați mai multe despre RxJava, lucrez la o carte aprofundată care explică cum să vizualizați problemele într-un mod reactiv folosind exemple Android. Dacă doriți să primiți actualizări despre el, vă rugăm să vă abonați aici.