Jak uprościć współbieżność dzięki modelowaniu reaktywnemu w systemie Android

Opublikowany: 2022-03-11

Współbieżność i asynchroniczność są nieodłączną częścią programowania mobilnego.

Radzenie sobie ze współbieżnością poprzez programowanie w stylu imperatywnym, czyli to, na czym zwykle polega programowanie w systemie Android, może być przyczyną wielu problemów. Korzystając z programowania reaktywnego z RxJava, możesz uniknąć potencjalnych problemów ze współbieżnością, zapewniając czystsze i mniej podatne na błędy rozwiązanie.

Oprócz uproszczenia współbieżnych, asynchronicznych zadań, RxJava zapewnia również możliwość wykonywania operacji w stylu funkcjonalnym, które przekształcają, łączą i agregują emisje z Observable, dopóki nie osiągniemy pożądanego rezultatu.

Łącząc reaktywny paradygmat RxJava i operacje stylu funkcjonalnego, możemy modelować szeroką gamę konstrukcji współbieżności w sposób reaktywny, nawet w niereaktywnym świecie Androida. W tym artykule dowiesz się, jak możesz to zrobić. Dowiesz się również, jak stopniowo wprowadzać RxJava do istniejącego projektu.

Jeśli jesteś nowy w RxJava, polecam przeczytanie postu tutaj, który mówi o niektórych podstawach RxJava.

Łączenie niereagującego z reaktywnym światem

Jednym z wyzwań związanych z dodaniem RxJava jako jednej z bibliotek do projektu jest to, że zasadniczo zmienia sposób, w jaki rozumujesz swój kod.

RxJava wymaga, abyś myślał o danych jako wymuszanych, a nie pobieranych. Chociaż sama koncepcja jest prosta, zmiana pełnej bazy kodu opartej na paradygmacie ściągania może być nieco zniechęcająca. Chociaż spójność jest zawsze idealna, nie zawsze możesz mieć przywilej wykonywania tego przejścia w całej bazie kodu jednocześnie, więc może być potrzebne bardziej podejście przyrostowe.

Rozważ następujący kod:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

Ta funkcja pobiera listę obiektów User z pamięci podręcznej, filtruje każdy z nich na podstawie tego, czy użytkownik ma bloga, sortuje je według nazwy użytkownika, a na koniec zwraca je rozmówcy. Patrząc na ten fragment, zauważamy, że wiele z tych operacji może wykorzystywać operatory RxJava; np. filter() i sorted() .

Przepisanie tego fragmentu kodu daje nam:

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Pierwszy wiersz funkcji konwertuje List<User> zwrócony przez UserCache.getAllUsers() na Observable<User> poprzez fromIterable() . To pierwszy krok do zaktywizowania naszego kodu. Teraz, gdy operujemy na Observable , umożliwia nam to wykonanie dowolnego operatora Observable w zestawie narzędzi RxJava – w tym przypadku filter() i sorted() .

W związku z tą zmianą należy zwrócić uwagę na kilka innych punktów.

Po pierwsze, sygnatura metody nie jest już taka sama. To może nie być wielka sprawa, jeśli to wywołanie metody jest używane tylko w kilku miejscach i łatwo jest propagować zmiany do innych obszarów stosu; jeśli jednak zepsuje klientów korzystających z tej metody, jest to problematyczne i sygnatura metody powinna zostać przywrócona.

Po drugie, RxJava została zaprojektowana z myślą o lenistwie. Oznacza to, że nie należy wykonywać długich operacji, gdy nie ma subskrybentów Observable . Dzięki tej modyfikacji założenie to nie jest już prawdziwe, ponieważ UserCache.getAllUsers() jest wywoływana jeszcze przed pojawieniem się jakichkolwiek subskrybentów.

Opuszczanie reaktywnego świata

Aby rozwiązać pierwszy problem z naszej zmiany, możemy użyć dowolnego z operatorów blokowania dostępnych dla Observable , takich jak blockingFirst() i blockingNext() . Zasadniczo oba te operatory będą blokować, dopóki element nie zostanie wyemitowany w dół strumienia: blockingFirst() zwróci pierwszy wyemitowany element i zakończenie, podczas gdy blockingNext() zwróci Iterable , który umożliwia wykonanie pętli for-each na danych źródłowych ( każda iteracja w pętli zostanie zablokowana).

Skutkiem ubocznym użycia operacji blokowania, o której należy pamiętać, jest to, że wyjątki są rzucane w wątku wywołującym, a nie przekazywane do metody onError() obserwatora.

Używając operatora blokującego do zmiany sygnatury metody z powrotem na List<User> , nasz fragment kodu wyglądałby teraz tak:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

Przed wywołaniem operatora blokującego (tj. blockingGet() ) musimy najpierw połączyć operator agregacji toList() tak, aby strumień został zmodyfikowany z Observable<User> do Single<List<User>> ( Single jest specjalnym typem of Observable , który emituje tylko pojedynczą wartość w onSuccess() lub błąd przez onError() ).

Następnie możemy wywołać operator blockingGet() , który rozpakowuje Single i zwraca List<User> .

Chociaż RxJava to obsługuje, w miarę możliwości należy tego unikać, ponieważ nie jest to idiomatyczne programowanie reaktywne. Jednak gdy jest to absolutnie konieczne, operatory blokujące są miłym sposobem na wyjście z reaktywnego świata.

Leniwe podejście

Jak wspomniano wcześniej, RxJava została zaprojektowana z myślą o lenistwie. Oznacza to, że długotrwałe operacje powinny być opóźnione tak długo, jak to możliwe (tj. do momentu wywołania subskrypcji na Observable ). Aby nasze rozwiązanie było leniwe, używamy operatora defer() .

defer() przyjmuje fabrykę ObservableSource , która tworzy Observable dla każdego nowego subskrybującego obserwatora. W naszym przypadku chcemy zwracać Observable.fromIterable(UserCache.getAllUser()) za każdym razem, gdy subskrybuje obserwator.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Teraz, gdy długo działająca operacja jest opakowana w defer() , mamy pełną kontrolę nad tym, w jakim wątku ma ona działać, po prostu określając odpowiedni Scheduler w subscribeOn() . Dzięki tej zmianie nasz kod jest w pełni reaktywny, a subskrypcja powinna nastąpić tylko w momencie, gdy potrzebne są dane.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

Innym całkiem użytecznym operatorem do odraczania obliczeń jest metoda fromCallable() . W przeciwieństwie do defer() , która oczekuje, że Observable zostanie zwrócone w funkcji lambda i z kolei „spłaszczy” zwrócony Observable , fromCallable() wywoła lambda i zwróci wartość w dół.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Pojedyncze użycie fromCallable() na liście zwróciłoby teraz Observable<List<User>> , musimy spłaszczyć tę listę za pomocą flatMap() .

Reaktywne-wszystko

Z poprzednich przykładów widzieliśmy, że możemy zawinąć dowolny obiekt w Observable i przeskakiwać między stanami niereaktywnymi i reaktywnymi za pomocą operacji blokujących i defer( defer() / fromCallable() . Korzystając z tych konstrukcji, możemy zacząć konwertować obszary aplikacji na Androida na reaktywne.

Długie operacje

Dobrym miejscem do początkowego zastanowienia się nad użyciem RxJava jest sytuacja, gdy masz proces, którego wykonanie zajmuje trochę czasu, takie jak połączenia sieciowe (sprawdź poprzedni post, aby zapoznać się z przykładami), odczyty i zapisy na dysku itp. Poniższy przykład ilustruje prostą funkcję, która zapisze tekst do systemu plików:

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

Wywołując tę ​​funkcję, musimy upewnić się, że jest ona wykonywana w osobnym wątku, ponieważ ta operacja blokuje. Nałożenie takiego ograniczenia na dzwoniącego komplikuje rzeczy dla dewelopera, co zwiększa prawdopodobieństwo wystąpienia błędów i może potencjalnie spowolnić rozwój.

Dodanie komentarza do funkcji oczywiście pomoże uniknąć błędów wywołującego, ale to wciąż jest dalekie od kuloodporności.

Używając RxJava, możemy jednak łatwo zapakować to w Observable i określić Scheduler , na którym powinien działać. W ten sposób osoba wywołująca nie musi w ogóle martwić się wywołaniem funkcji w osobnym wątku; funkcja sama się tym zajmie.

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

Używając fromCallable() , zapis tekstu do pliku jest odraczany do czasu subskrypcji.

Ponieważ wyjątki są obiektami pierwszej klasy w RxJava, kolejną zaletą naszej zmiany jest to, że nie musimy już owijać operacji w blok try/catch. Wyjątek będzie po prostu propagowany z prądem, a nie połykany. Dzięki temu wywołujący może obsłużyć wyjątek według własnego uznania (np. pokazać użytkownikowi błąd w zależności od tego, jaki wyjątek został zgłoszony itp.).

Inną optymalizacją, jaką możemy wykonać, jest zwrócenie Completable zamiast Observable . Completable jest zasadniczo specjalnym typem Observable — podobnym do Single — który po prostu wskazuje, czy obliczenie powiodło się za pomocą onComplete() , czy nie powiodło się za pomocą onError() . Zwracanie wartości Completable wydaje się w tym przypadku bardziej sensowne, ponieważ wydaje się głupie zwracanie pojedynczej prawdy w strumieniu Observable .

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

Aby zakończyć operację, używamy operacji fromAction() elementu Completable , ponieważ wartość zwracana nie jest już dla nas interesująca. W razie potrzeby, podobnie jak Observable , Completable obsługuje również funkcje fromCallable() i defer( defer() .

Zastępowanie wywołań zwrotnych

Do tej pory wszystkie przykłady, którym się przyglądaliśmy, emitują albo jedną wartość (tj. mogą być modelowane jako Single ), albo mówią nam, czy operacja się powiodła, czy nie (tj. może być modelowana jako Completable ).

Jak jednak możemy przekonwertować obszary w naszej aplikacji, które otrzymują ciągłe aktualizacje lub zdarzenia (takie jak aktualizacje lokalizacji, wyświetlanie zdarzeń kliknięć, zdarzenia czujników itd.)?

Przyjrzymy się dwóm sposobom na zrobienie tego, używając metody create() i używania Subjects .

create() pozwala nam jawnie wywołać funkcję onNext() obserwatora | onComplete() | onError() , gdy otrzymujemy aktualizacje z naszego źródła danych. Aby użyć funkcji create() , przekazujemy ObservableOnSubscribe , który otrzymuje ObservableEmitter za każdym razem, gdy subskrybuje obserwator. Korzystając z otrzymanego emitera, możemy następnie wykonać wszystkie niezbędne wywołania konfiguracyjne, aby rozpocząć odbieranie aktualizacji, a następnie wywołać odpowiednie zdarzenie Emitter .

W przypadku aktualizacji lokalizacji możemy się zarejestrować, aby otrzymywać aktualizacje w tym miejscu i emitować aktualizacje lokalizacji w miarę odebrania.

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

Funkcja wewnątrz create() żąda aktualizacji lokalizacji i przekazuje wywołanie zwrotne, które jest wywoływane, gdy zmienia się lokalizacja urządzenia. Jak widać tutaj, zasadniczo zastępujemy interfejs w stylu wywołania zwrotnego i zamiast tego emitujemy otrzymaną lokalizację w utworzonym strumieniu Observable (ze względów edukacyjnych pominąłem niektóre szczegóły przy konstruowaniu żądania lokalizacji, jeśli chcesz się zagłębić więcej szczegółów można przeczytać tutaj).

Inną rzeczą, na którą należy zwrócić uwagę przy create() , jest to, że za każdym razem, gdy wywoływana jest subscribe() , dostarczany jest nowy emiter. Innymi słowy, create() zwraca zimny Observable . Oznacza to, że w powyższej funkcji potencjalnie wielokrotnie żądalibyśmy aktualizacji lokalizacji, co nie jest tym, czego chcemy.

Aby obejść ten problem, chcemy zmienić funkcję tak, aby zwracała gorący Observable za pomocą Subjects .

Wprowadź tematy

Subject rozszerza Observable i jednocześnie implementuje Observer . Jest to szczególnie przydatne, gdy chcemy jednocześnie wyemitować lub przesłać to samo zdarzenie do wielu subskrybentów. Jeśli chodzi o implementację, chcielibyśmy uwidocznić Subject jako Observable dla klientów, zachowując go jako Subject dla dostawcy.

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

W tej nowej implementacji używany jest podtyp PublishSubject , który emituje zdarzenia, gdy nadchodzą, począwszy od czasu subskrypcji. W związku z tym, jeśli subskrypcja jest wykonywana w momencie, gdy aktualizacje lokalizacji zostały już wyemitowane, obserwator nie otrzyma wcześniejszych emisji, a jedynie kolejne. Jeśli to zachowanie nie jest pożądane, istnieje kilka innych podtypów Subject w zestawie narzędzi RxJava, których można użyć.

Dodatkowo stworzyliśmy również osobną funkcję connect() , która uruchamia żądanie otrzymywania aktualizacji lokalizacji. observeLocation() nadal może wykonać wywołanie connect() , ale zrefaktorowaliśmy je z funkcji dla przejrzystości/prostoty.

Streszczenie

Przyjrzeliśmy się kilku mechanizmom i technikom:

  • defer() i jej warianty opóźniające wykonanie obliczeń do czasu subskrypcji
  • zimne Observables generowane przez create()
  • gorące Observables używające Subjects
  • blokowanie operacji X, gdy chcemy opuścić świat reaktywny

Mamy nadzieję, że przykłady podane w tym artykule zainspirowały kilka pomysłów dotyczących różnych obszarów w Twojej aplikacji, które można przekształcić w reaktywne. Wiele omówiliśmy i jeśli masz jakieś pytania, sugestie lub jeśli coś jest niejasne, możesz zostawić komentarz poniżej!

Jeśli chcesz dowiedzieć się więcej o RxJava, pracuję nad dogłębną książką, która wyjaśnia, jak przeglądać problemy w reaktywny sposób na przykładach Androida. Jeśli chcesz otrzymywać aktualizacje na ten temat, zasubskrybuj tutaj.