如何在 Android 上使用響應式建模簡化並發

已發表: 2022-03-11

並發和異步是移動編程固有的。

通過命令式編程來處理並發,這是 Android 編程通常涉及的,可能會導致許多問題。 將反應式編程與 RxJava 結合使用,您可以通過提供更簡潔且不易出錯的解決方案來避免潛在的並發問題。

除了簡化並發、異步任務之外,RxJava 還提供了執行函數式操作的能力,這些操作可以轉換、組合和聚合來自 Observable 的發射,直到我們達到我們想要的結果。

通過結合 RxJava 的響應式範式和函數式操作,我們可以以響應式的方式對各種並發結構進行建模,即使在 Android 的非響應式世界中也是如此。 在本文中,您將了解如何做到這一點。 您還將學習如何逐步將 RxJava 應用到現有項目中。

如果您是 RxJava 新手,我建議您閱讀這裡的文章,其中討論了 RxJava 的一些基礎知識。

將非反應式連接到反應式世界

將 RxJava 作為庫之一添加到您的項目中的挑戰之一是它從根本上改變了您推理代碼的方式。

RxJava 要求您將數據視為被推送而不是被拉取。 雖然這個概念本身很簡單,但更改基於拉式範式的完整代碼庫可能有點令人生畏。 儘管一致性始終是理想的,但您可能並不總是有權一次在整個代碼庫中進行這種轉換,因此可能需要更多的增量方法。

考慮以下代碼:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

該函數從緩存中獲取User對象列表,根據用戶是否擁有博客過濾每個對象,按用戶名排序,最後返回給調用者。 查看這段代碼,我們注意到其中許多操作都可以利用 RxJava 運算符; 例如, filter()sorted()

重寫這個片段然後給我們:

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

函數的第一行通過fromIterable()UserCache.getAllUsers()返回的List<User>轉換為Observable<User> 。 這是使我們的代碼具有響應性的第一步。 現在我們正在對Observable進行操作,這使我們能夠執行 RxJava 工具包中的任何Observable運算符——在本例中為filter()sorted()

關於此更改,還有其他幾點需要注意。

首先,方法簽名不再相同。 如果此方法調用僅在少數地方使用,並且很容易將更改傳播到堆棧的其他區域,這可能不是什麼大問題; 但是,如果它破壞了依賴此方法的客戶端,那就有問題了,應該恢復方法簽名。

其次,RxJava 的設計考慮到了惰性。 也就是說,當Observable沒有訂閱者時,不應執行 long 操作。 通過此修改,該假設不再成立,因為UserCache.getAllUsers()甚至在有任何訂閱者之前就已被調用。

離開反應性世界

為了解決我們更改的第一個問題,我們可以使用Observable可用的任何阻塞運算符,例如blockingFirst()blockingNext() 。 本質上,這兩個運算符都會阻塞,直到下游發出一個項目: blockingFirst()將返回第一個發出並完成的元素,而blockingNext()將返回一個Iterable ,它允許您對底層數據執行 for-each 循環(通過循環的每次迭代都會阻塞)。

但是,使用阻塞操作的一個副作用是需要注意的是,異常會在調用線程上拋出,而不是傳遞給觀察者的onError()方法。

使用阻塞運算符將方法簽名更改回List<User> ,我們的代碼片段現在看起來像這樣:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

在調用阻塞運算符(即blockingGet() )之前,我們首先需要鏈接聚合運算符toList() ,以便將流從Observable<User>修改為Single<List<User>>Single是一種特殊類型的Observable僅在onSuccess()中發出單個值,或通過onError()發出錯誤)。

之後,我們可以調用阻塞操作符blockingGet()來解開Single並返回List<User>

儘管 RxJava 支持這一點,但應盡可能避免這種情況,因為這不是慣用的反應式編程。 但是,當絕對必要時,阻塞操作符是走出反應式世界的一種很好的初始方式。

懶惰的方法

如前所述,RxJava 的設計考慮到了惰性。 也就是說,應該盡可能長時間地延遲長時間運行的操作(即,直到在Observable上調用訂閱)。 為了使我們的解決方案變得懶惰,我們使用了defer()運算符。

defer()接受一個ObservableSource工廠,它為每個訂閱的新觀察者創建一個Observable 。 在我們的例子中,我們希望在觀察者訂閱時返回Observable.fromIterable(UserCache.getAllUser())

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

現在長時間運行的操作包含在defer()中,我們可以完全控制應該在哪個線程中運行,只需在subscribeOn()中指定適當的Scheduler即可。 有了這個改變,我們的代碼是完全反應式的,訂閱應該只在需要數據的時候發生。

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

另一個用於延遲計算的非常有用的運算符是fromCallable()方法。 與defer()不同,它期望在 lambda 函數中返回Observable並反過來“展平”返回的ObservablefromCallable()將調用 lambda 並在下游返回值。

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

在列表上使用fromCallable()現在會返回一個Observable<List<User>> ,我們需要使用flatMap()展平這個列表。

反應式一切

從前面的示例中,我們看到我們可以將任何對象包裝在Observable中,並使用阻塞操作和defer() / fromCallable()在非反應狀態和反應狀態之間跳轉。 使用這些構造,我們可以開始將 Android 應用程序的區域轉換為響應式。

長期操作

最初考慮使用 RxJava 的一個好地方是當您有一個需要一段時間才能執行的進程時,例如網絡調用(查看以前的帖子以獲取示例)、磁盤讀取和寫入等。下面的示例說明了一個簡單的函數,將文本寫入文件系統:

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

當調用這個函數時,我們需要確保它是在一個單獨的線程上完成的,因為這個操作是阻塞的。 對調用者施加這樣的限制會使開發人員的事情變得複雜,這增加了錯誤的可能性並可能會減慢開發速度。

向函數添加註釋當然有助於避免調用者出錯,但這仍遠非防彈。

然而,使用 RxJava,我們可以輕鬆地將其包裝到Observable中並指定它應該運行的Scheduler 。 這樣,調用者根本不需要關心在單獨的線程中調用函數; 該功能將自行處理。

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

使用fromCallable() ,將文本寫入文件被推遲到訂閱時間。

由於異常是 RxJava 中的一等對象,我們更改的另一個好處是我們不再需要將操作包裝在 try/catch 塊中。 異常將簡單地向下游傳播,而不是被吞沒。 這允許調用者處理他/她認為合適的異常(例如,根據拋出的異常向用戶顯示錯誤等)。

我們可以執行的另一種優化是返回Completable而不是ObservableCompletable本質上是一種特殊類型的Observable類似於Single它只是通過onComplete()指示計算是否成功,或者通過onError()指示計算是否失敗。 在這種情況下,返回Completable似乎更有意義,因為在Observable流中返回單個 true 似乎很愚蠢。

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

為了完成操作,我們使用CompletablefromAction()操作,因為我們不再感興趣返回值。 如果需要,像Observable一樣, Completable也支持fromCallable()defer()函數。

替換回調

到目前為止,我們看到的所有示例都發出一個值(即,可以建模為Single ),或者告訴我們操作是成功還是失敗(即,可以建模為Completable )。

但是,我們如何轉換應用程序中接收持續更新或事件(例如位置更新、查看點擊事件、傳感器事件等)的區域?

我們將看看兩種方法來做到這一點,使用create()和使用Subjects

create()允許我們顯式調用觀察者的onNext() | onComplete() | onError()方法,因為我們從數據源接收更新。 為了使用create() ,我們傳入一個ObservableOnSubscribe ,它在觀察者訂閱時接收一個ObservableEmitter 。 使用接收到的發射器,我們可以執行所有必要的設置調用以開始接收更新,然後調用適當的Emitter器事件。

在位置更新的情況下,我們可以註冊以在這個地方接收更新並在收到時發出位置更新。

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

create()調用中的函數請求位置更新並傳入一個回調,該回調會在設備的位置更改時被調用。 正如我們在這裡看到的,我們基本上替換了回調樣式的接口,而是在創建的 Observable 流中發出接收到的位置(為了教育目的,如果你想深入研究,我跳過了構建位置請求的一些細節更深入的細節,你可以在這裡閱讀)。

關於create()需要注意的另一件事是,每當調用subscribe()時,都會提供一個新的發射器。 換句話說, create()返回一個冷的Observable 。 這意味著,在上面的函數中,我們可能會多次請求位置更新,這不是我們想要的。

為了解決這個問題,我們想在Subjects的幫助下改變函數以返回一個熱的Observable

輸入主題

Subject擴展了Observable並同時實現了Observer 。 每當我們想同時向多個訂閱者發出或強制轉換相同的事件時,這尤其有用。 在實現方面,我們希望將Subject作為Observable公開給客戶,同時將其作為提供者的Subject

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

在這個新的實現中,使用PublishSubject子類型,它從訂閱時間開始在事件到達時發出事件。 因此,如果在已經發出位置更新的點執行訂閱,則觀察者將不會收到過去的發射,只會收到後續的發射。 如果不需要這種行為,可以使用 RxJava 工具包中的其他幾個Subject子類型。

此外,我們還創建了一個單獨的connect()函數來啟動接收位置更新的請求。 observeLocation()仍然可以進行connect()調用,但為了清晰/簡單,我們將其從函數中重構出來。

概括

我們研究了許多機制和技術:

  • defer()及其變體將計算的執行延遲到訂閱
  • 通過create()生成的冷Observables
  • 使用Subjects的熱門Observables
  • 當我們想要離開響應式世界時,blockingX 操作

希望本文中提供的示例激發了一些關於您的應用程序中可以轉換為反應式的不同區域的想法。 我們已經介紹了很多,如果您有任何問題、建議或任何不清楚的地方,請隨時在下面發表評論!

如果您有興趣了解有關 RxJava 的更多信息,我正在編寫一本深入的書,該書解釋瞭如何使用 Android 示例以反應式方式查看問題。 如果您想收到有關它的更新,請在此處訂閱。