So vereinfachen Sie die Parallelität mit reaktiver Modellierung auf Android
Veröffentlicht: 2022-03-11Gleichzeitigkeit und Asynchronität sind der mobilen Programmierung inhärent.
Der Umgang mit Nebenläufigkeit durch Programmierung im Imperativstil, was die Programmierung auf Android im Allgemeinen beinhaltet, kann die Ursache vieler Probleme sein. Durch die Verwendung von Reactive Programming mit RxJava können Sie potenzielle Parallelitätsprobleme vermeiden, indem Sie eine sauberere und weniger fehleranfällige Lösung bereitstellen.
Neben der Vereinfachung gleichzeitiger, asynchroner Aufgaben bietet RxJava auch die Möglichkeit, funktionale Operationen durchzuführen, die Emissionen von einem Observable transformieren, kombinieren und aggregieren, bis wir unser gewünschtes Ergebnis erzielen.
Durch die Kombination des reaktiven Paradigmas von RxJava und funktionaler Stiloperationen können wir eine breite Palette von Parallelitätskonstrukten auf reaktive Weise modellieren, sogar in der nicht reaktiven Welt von Android. In diesem Artikel erfahren Sie, wie Sie genau das tun können. Sie lernen auch, wie Sie RxJava schrittweise in ein bestehendes Projekt übernehmen.
Wenn Sie neu bei RxJava sind, empfehle ich Ihnen, den Beitrag hier zu lesen, der über einige der Grundlagen von RxJava spricht.
Nicht-Reaktive in die reaktive Welt überbrücken
Eine der Herausforderungen beim Hinzufügen von RxJava als eine der Bibliotheken zu Ihrem Projekt besteht darin, dass es die Art und Weise, wie Sie über Ihren Code nachdenken, grundlegend ändert.
RxJava erfordert, dass Sie sich vorstellen, dass Daten gepusht und nicht gezogen werden. Während das Konzept selbst einfach ist, kann das Ändern einer vollständigen Codebasis, die auf einem Pull-Paradigma basiert, etwas entmutigend sein. Obwohl Konsistenz immer ideal ist, haben Sie möglicherweise nicht immer das Privileg, diesen Übergang in Ihrer gesamten Codebasis auf einmal vorzunehmen, sodass möglicherweise ein eher inkrementeller Ansatz erforderlich ist.
Betrachten Sie den folgenden Code:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
Diese Funktion ruft eine Liste von User
aus dem Cache ab, filtert sie danach, ob der Benutzer einen Blog hat oder nicht, sortiert sie nach dem Namen des Benutzers und gibt sie schließlich an den Aufrufer zurück. Wenn wir uns dieses Snippet ansehen, stellen wir fest, dass viele dieser Operationen RxJava-Operatoren nutzen können; zB filter()
und sorted()
.
Das Umschreiben dieses Ausschnitts ergibt dann:
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Die erste Zeile der Funktion konvertiert die von UserCache.getAllUsers() zurückgegebene List<User>
über fromIterable()
UserCache.getAllUsers()
in eine Observable<User>
. Dies ist der erste Schritt, um unseren Code reaktiv zu machen. Da wir nun mit einem Observable
arbeiten, können wir jeden Observable
-Operator im RxJava-Toolkit ausführen – in diesem Fall filter()
und sorted()
.
Zu dieser Änderung sind noch einige weitere Punkte zu beachten.
Erstens ist die Methodensignatur nicht mehr dieselbe. Dies ist möglicherweise keine große Sache, wenn dieser Methodenaufruf nur an wenigen Stellen verwendet wird und es einfach ist, die Änderungen auf andere Bereiche des Stapels zu übertragen. Wenn jedoch Clients, die sich auf diese Methode verlassen, unterbrochen werden, ist dies problematisch, und die Methodensignatur sollte zurückgesetzt werden.
Zweitens ist RxJava auf Faulheit ausgelegt. Das heißt, es sollten keine langen Operationen durchgeführt werden, wenn es keine Abonnenten des Observable
gibt. Mit dieser Änderung trifft diese Annahme nicht mehr zu, da UserCache.getAllUsers()
aufgerufen wird, noch bevor Abonnenten vorhanden sind.
Verlassen der reaktiven Welt
Um das erste Problem unserer Änderung zu beheben, können wir jeden der Blockierungsoperatoren verwenden, die für ein Observable
verfügbar sind, wie etwa blockingFirst()
und blockingNext()
. Im Wesentlichen blockieren diese beiden Operatoren, bis ein Element nachgelagert ausgegeben wird: blockingFirst()
gibt das erste ausgegebene Element zurück und endet, während blockingNext()
ein Iterable
, mit dem Sie eine For-Each-Schleife für die zugrunde liegenden Daten ausführen können ( jede Iteration durch die Schleife blockiert).
Ein Nebeneffekt der Verwendung einer blockierenden Operation, den Sie beachten sollten, besteht jedoch darin, dass Ausnahmen im aufrufenden Thread ausgelöst werden, anstatt an die onError()
-Methode eines Beobachters übergeben zu werden.
Wenn Sie einen blockierenden Operator verwenden, um die Methodensignatur wieder in eine List<User>
zu ändern, würde unser Snippet jetzt so aussehen:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
Vor dem Aufrufen eines blockierenden Operators (d. h. blockingGet()
) müssen wir zuerst den Aggregatoperator mit toList()
, sodass der Stream von einem Observable<User>
in einen Single<List<User>>
geändert wird (ein Single
ist ein spezieller Typ of Observable
, das nur einen einzigen Wert in onSuccess()
oder einen Fehler über onError()
).
Danach können wir den Blockierungsoperator blockingGet()
aufrufen, der das Single
auspackt und eine List<User>
zurückgibt.
Obwohl RxJava dies unterstützt, sollte dies so weit wie möglich vermieden werden, da dies keine idiomatische reaktive Programmierung ist. Wenn es jedoch absolut notwendig ist, sind blockierende Operatoren eine nette erste Möglichkeit, aus der reaktiven Welt auszusteigen.
Der faule Ansatz
Wie bereits erwähnt, wurde RxJava im Hinblick auf Faulheit entwickelt. Das heißt, lang andauernde Operationen sollten so lange wie möglich verzögert werden (dh bis ein Abonnement für ein Observable
aufgerufen wird). Um unsere Lösung faul zu machen, verwenden wir den Operator defer()
.
defer()
übernimmt eine ObservableSource
-Factory, die ein Observable
für jeden neuen Beobachter erstellt, der sich anmeldet. In unserem Fall möchten wir Observable.fromIterable(UserCache.getAllUser())
, wenn ein Beobachter sich anmeldet.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Jetzt, da die lange laufende Operation in eine defer()
ist, haben wir die volle Kontrolle darüber, in welchem Thread diese ausgeführt werden soll, indem wir einfach den entsprechenden Scheduler
subscribeOn()
. Mit dieser Änderung ist unser Code vollständig reaktiv und das Abonnement sollte nur in dem Moment erfolgen, in dem die Daten benötigt werden.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
Ein weiterer sehr nützlicher Operator zum Verzögern von Berechnungen ist die Methode fromCallable()
. Im Gegensatz zu defer()
, das erwartet, dass ein Observable
in der Lambda-Funktion zurückgegeben wird, und das zurückgegebene Observable
wiederum „abflacht“, fromCallable()
das Lambda auf und gibt den Wert nachgelagert zurück.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Die einmalige Verwendung von fromCallable()
auf einer Liste würde jetzt ein Observable<List<User>>
zurückgeben, wir müssen diese Liste mit flatMap()
glätten.
Reaktiv-alles
Aus den vorherigen Beispielen haben wir gesehen, dass wir jedes Objekt in ein Observable
einhüllen und mithilfe von Blockierungsoperationen und defer()
/ fromCallable()
zwischen nicht reaktiven und reaktiven Zuständen springen können. Mit diesen Konstrukten können wir damit beginnen, Bereiche einer Android-App reaktiv zu machen.
Lange Operationen
Ein guter Ort, um zunächst an die Verwendung von RxJava zu denken, ist immer dann, wenn Sie einen Prozess haben, dessen Ausführung eine Weile dauert, wie z schreibt Text in das Dateisystem:
/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
Beim Aufruf dieser Funktion müssen wir sicherstellen, dass dies in einem separaten Thread erfolgt, da diese Operation blockiert. Das Auferlegen einer solchen Einschränkung für den Aufrufer verkompliziert die Dinge für den Entwickler, was die Wahrscheinlichkeit von Fehlern erhöht und die Entwicklung potenziell verlangsamen kann.

Das Hinzufügen eines Kommentars zur Funktion hilft natürlich, Fehler des Aufrufers zu vermeiden, aber das ist noch lange nicht kugelsicher.
Mit RxJava können wir dies jedoch einfach in ein Observable
verpacken und den Scheduler
angeben, auf dem es ausgeführt werden soll. Auf diese Weise muss sich der Aufrufer überhaupt nicht darum kümmern, die Funktion in einem separaten Thread aufzurufen; die Funktion kümmert sich selbst darum.
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
Bei Verwendung fromCallable()
wird das Schreiben des Textes in die Datei bis zum Abonnementzeitpunkt verschoben.
Da Ausnahmen erstklassige Objekte in RxJava sind, besteht ein weiterer Vorteil unserer Änderung darin, dass wir die Operation nicht mehr in einen Try/Catch-Block einschließen müssen. Die Ausnahme wird einfach stromabwärts weitergegeben und nicht geschluckt. Dadurch kann der Aufrufer die Ausnahme nach Belieben behandeln (z. B. dem Benutzer einen Fehler anzeigen, je nachdem, welche Ausnahme ausgelöst wurde usw.).
Eine weitere Optimierung, die wir durchführen können, besteht darin, ein Completable
anstelle eines Observable
zurückzugeben. Ein Completable
ist im Wesentlichen eine spezielle Art von Observable
– ähnlich einem Single
– das einfach anzeigt, ob eine Berechnung über onComplete()
erfolgreich war oder über onError()
fehlgeschlagen ist. Das Zurückgeben eines Completable
scheint in diesem Fall sinnvoller zu sein, da es albern erscheint, ein einzelnes True in einem Observable
-Stream zurückzugeben.
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
Um die Operation abzuschließen, verwenden wir die Operation fromAction()
eines Completable
, da uns der Rückgabewert nicht mehr interessiert. Bei Bedarf unterstützt ein Completable
wie ein Observable
auch die Funktionen fromCallable()
und defer defer()
.
Rückrufe ersetzen
Bisher haben alle Beispiele, die wir uns angesehen haben, entweder einen Wert ausgegeben (dh als Single
modelliert) oder uns mitgeteilt, ob eine Operation erfolgreich war oder fehlgeschlagen ist (dh als Completable
modelliert werden können).
Wie können wir jedoch Bereiche in unserer App umwandeln, die kontinuierliche Updates oder Ereignisse erhalten (z. B. Standortaktualisierungen, Klickereignisse anzeigen, Sensorereignisse usw.)?
Wir werden uns zwei Möglichkeiten ansehen, um dies zu tun, mit create()
und mit Subjects
.
create()
ermöglicht es uns, onNext()
| eines Beobachters explizit aufzurufen onComplete()
| onError()
Methode, wenn wir Updates von unserer Datenquelle erhalten. Um create()
zu verwenden, übergeben wir ein ObservableOnSubscribe
, das einen ObservableEmitter
empfängt, wenn ein Beobachter sich anmeldet. Unter Verwendung des empfangenen Emitters können wir dann alle notwendigen Einrichtungsaufrufe ausführen, um mit dem Empfangen von Aktualisierungen zu beginnen, und dann das entsprechende Emitter
-Ereignis aufrufen.
Im Falle von Standortaktualisierungen können wir uns registrieren, um Aktualisierungen an diesem Ort zu erhalten, und Standortaktualisierungen wie empfangen ausgeben.
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
Die Funktion innerhalb des create()
Aufrufs fordert Standortaktualisierungen an und übergibt einen Rückruf, der aufgerufen wird, wenn sich der Standort des Geräts ändert. Wie wir hier sehen können, ersetzen wir im Wesentlichen die Schnittstelle im Callback-Stil und geben stattdessen den empfangenen Standort im erstellten Observable-Stream aus (aus pädagogischen Gründen habe ich einige der Details übersprungen, indem ich eine Standortanfrage erstellt habe, wenn Sie sich vertiefen möchten Genauere Details können Sie hier nachlesen).
Eine weitere Anmerkung zu create()
ist, dass bei jedem Aufruf subscribe()
ein neuer Emitter bereitgestellt wird. Mit anderen Worten, create()
gibt ein kaltes Observable
zurück. Dies bedeutet, dass wir in der obigen Funktion möglicherweise mehrmals Standortaktualisierungen anfordern würden, was nicht das ist, was wir wollen.
Um dies zu umgehen, möchten wir die Funktion ändern, um mit Hilfe von Subjects
ein heißes Observable
zurückzugeben.
Betreff eingeben
Ein Subject
erweitert ein Observable
und implementiert gleichzeitig Observer
. Dies ist besonders nützlich, wenn wir dasselbe Ereignis gleichzeitig an mehrere Abonnenten senden oder übertragen möchten. In Bezug auf die Implementierung möchten wir das Subject
als Observable
für Clients verfügbar machen, während es als Subject
für den Anbieter beibehalten wird.
public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }
In dieser neuen Implementierung wird der Untertyp PublishSubject
verwendet, der Ereignisse ausgibt, sobald sie ab dem Zeitpunkt des Abonnements eintreffen. Dementsprechend werden, wenn ein Abonnement zu einem Zeitpunkt durchgeführt wird, an dem Standortaktualisierungen bereits gesendet wurden, vergangene Emissionen nicht von dem Beobachter empfangen, sondern nur nachfolgende. Wenn dieses Verhalten nicht erwünscht ist, gibt es im Subject
-Toolkit einige andere Untertypen von Betreffs, die verwendet werden können.
Darüber hinaus haben wir auch eine separate connect()
Funktion erstellt, die die Anfrage zum Erhalt von Standortaktualisierungen startet. observeLocation()
kann immer noch den connect()
-Aufruf ausführen, aber wir haben ihn aus Gründen der Klarheit/Einfachheit aus der Funktion heraus umgestaltet.
Zusammenfassung
Wir haben uns eine Reihe von Mechanismen und Techniken angesehen:
-
defer()
und seine Varianten, um die Ausführung einer Berechnung bis zum Abonnement zu verzögern - Cold
Observables
generiert durchcreate()
- Hot
Observables
mitSubjects
- Blockieren von X-Operationen, wenn wir die reaktive Welt verlassen wollen
Hoffentlich haben die Beispiele in diesem Artikel einige Ideen zu verschiedenen Bereichen in Ihrer App inspiriert, die reaktiv umgewandelt werden können. Wir haben viel behandelt und wenn Sie Fragen, Anregungen oder Unklarheiten haben, können Sie unten einen Kommentar hinterlassen!
Wenn Sie daran interessiert sind, mehr über RxJava zu erfahren, arbeite ich an einem ausführlichen Buch, das anhand von Android-Beispielen erklärt, wie Sie Probleme reaktiv betrachten können. Wenn Sie Updates dazu erhalten möchten, abonnieren Sie bitte hier.