如何在 Android 上使用响应式建模简化并发
已发表: 2022-03-11并发和异步是移动编程固有的。
通过命令式编程来处理并发,这是 Android 编程通常涉及的,可能会导致许多问题。 将反应式编程与 RxJava 结合使用,您可以通过提供更简洁且不易出错的解决方案来避免潜在的并发问题。
除了简化并发、异步任务之外,RxJava 还提供了执行函数式操作的能力,这些操作可以转换、组合和聚合来自 Observable 的发射,直到我们达到我们想要的结果。
通过结合 RxJava 的响应式范式和函数式操作,我们可以以响应式的方式对各种并发结构进行建模,即使在 Android 的非响应式世界中也是如此。 在本文中,您将了解如何做到这一点。 您还将学习如何逐步将 RxJava 应用到现有项目中。
如果您是 RxJava 新手,我建议您阅读这里的文章,其中讨论了 RxJava 的一些基础知识。
将非反应式连接到反应式世界
将 RxJava 作为库之一添加到您的项目中的挑战之一是它从根本上改变了您推理代码的方式。
RxJava 要求您将数据视为被推送而不是被拉取。 虽然这个概念本身很简单,但更改基于拉式范式的完整代码库可能有点令人生畏。 尽管一致性始终是理想的,但您可能并不总是有权一次在整个代码库中进行这种转换,因此可能需要更多的增量方法。
考虑以下代码:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
该函数从缓存中获取User
对象列表,根据用户是否拥有博客过滤每个对象,按用户名排序,最后返回给调用者。 查看这段代码,我们注意到其中许多操作都可以利用 RxJava 运算符; 例如, filter()
和sorted()
。
重写这个片段然后给我们:
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
函数的第一行通过fromIterable()
将UserCache.getAllUsers()
返回的List<User>
转换为Observable<User>
。 这是使我们的代码具有响应性的第一步。 现在我们正在对Observable
进行操作,这使我们能够执行 RxJava 工具包中的任何Observable
运算符——在本例中为filter()
和sorted()
。
关于此更改,还有其他几点需要注意。
首先,方法签名不再相同。 如果此方法调用仅在少数地方使用,并且很容易将更改传播到堆栈的其他区域,这可能不是什么大问题; 但是,如果它破坏了依赖此方法的客户端,那就有问题了,应该恢复方法签名。
其次,RxJava 的设计考虑到了惰性。 也就是说,当Observable
没有订阅者时,不应执行 long 操作。 通过此修改,该假设不再成立,因为UserCache.getAllUsers()
甚至在有任何订阅者之前就已被调用。
离开反应性世界
为了解决我们更改的第一个问题,我们可以使用Observable
可用的任何阻塞运算符,例如blockingFirst()
和blockingNext()
。 本质上,这两个运算符都会阻塞,直到下游发出一个项目: blockingFirst()
将返回第一个发出并完成的元素,而blockingNext()
将返回一个Iterable
,它允许您对底层数据执行 for-each 循环(通过循环的每次迭代都会阻塞)。
但是,使用阻塞操作的一个副作用是需要注意的是,异常会在调用线程上抛出,而不是传递给观察者的onError()
方法。
使用阻塞运算符将方法签名更改回List<User>
,我们的代码片段现在看起来像这样:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
在调用阻塞运算符(即blockingGet()
)之前,我们首先需要链接聚合运算符toList()
,以便将流从Observable<User>
修改为Single<List<User>>
( Single
是一种特殊类型的Observable
仅在onSuccess()
中发出单个值,或通过onError()
发出错误)。
之后,我们可以调用阻塞操作符blockingGet()
来解开Single
并返回List<User>
。
尽管 RxJava 支持这一点,但应尽可能避免这种情况,因为这不是惯用的反应式编程。 但是,当绝对必要时,阻塞操作符是走出反应式世界的一种很好的初始方式。
懒惰的方法
如前所述,RxJava 的设计考虑到了惰性。 也就是说,应该尽可能长时间地延迟长时间运行的操作(即,直到在Observable
上调用订阅)。 为了使我们的解决方案变得懒惰,我们使用了defer()
运算符。
defer()
接受一个ObservableSource
工厂,它为每个订阅的新观察者创建一个Observable
。 在我们的例子中,我们希望在观察者订阅时返回Observable.fromIterable(UserCache.getAllUser())
。
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
现在长时间运行的操作包含在defer()
中,我们可以完全控制应该在哪个线程中运行,只需在subscribeOn()
中指定适当的Scheduler
即可。 有了这个改变,我们的代码是完全反应式的,订阅应该只在需要数据的时候发生。
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
另一个用于延迟计算的非常有用的运算符是fromCallable()
方法。 与defer()
不同,它期望在 lambda 函数中返回Observable
并反过来“展平”返回的Observable
, fromCallable()
将调用 lambda 并在下游返回值。
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
在列表上使用fromCallable()
现在会返回一个Observable<List<User>>
,我们需要使用flatMap()
展平这个列表。
反应式一切
从前面的示例中,我们看到我们可以将任何对象包装在Observable
中,并使用阻塞操作和defer()
/ fromCallable()
在非反应状态和反应状态之间跳转。 使用这些构造,我们可以开始将 Android 应用程序的区域转换为响应式。
长期操作
最初考虑使用 RxJava 的一个好地方是当您有一个需要一段时间才能执行的进程时,例如网络调用(查看以前的帖子中的示例)、磁盘读取和写入等。下面的示例说明了一个简单的函数,将文本写入文件系统:
/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
当调用这个函数时,我们需要确保它是在一个单独的线程上完成的,因为这个操作是阻塞的。 对调用者施加这样的限制会使开发人员的事情变得复杂,这增加了错误的可能性并可能会减慢开发速度。

向函数添加注释当然有助于避免调用者出错,但这仍远非防弹。
然而,使用 RxJava,我们可以轻松地将其包装到Observable
中并指定它应该运行的Scheduler
。 这样,调用者根本不需要关心在单独的线程中调用函数; 该功能将自行处理。
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
使用fromCallable()
,将文本写入文件被推迟到订阅时间。
由于异常是 RxJava 中的一等对象,我们更改的另一个好处是我们不再需要将操作包装在 try/catch 块中。 异常将简单地向下游传播,而不是被吞没。 这允许调用者处理他/她认为合适的异常(例如,根据抛出的异常向用户显示错误等)。
我们可以执行的另一种优化是返回Completable
而不是Observable
。 Completable
本质上是一种特殊类型的Observable
类似于Single
它只是通过onComplete()
指示计算是否成功,或者通过onError()
指示计算是否失败。 在这种情况下,返回Completable
似乎更有意义,因为在Observable
流中返回单个 true 似乎很愚蠢。
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
为了完成操作,我们使用Completable
的fromAction()
操作,因为我们不再感兴趣返回值。 如果需要,像Observable
一样, Completable
也支持fromCallable()
和defer()
函数。
替换回调
到目前为止,我们看到的所有示例都发出一个值(即,可以建模为Single
),或者告诉我们操作是成功还是失败(即,可以建模为Completable
)。
但是,我们如何转换应用程序中接收持续更新或事件(例如位置更新、查看点击事件、传感器事件等)的区域?
我们将看看两种方法来做到这一点,使用create()
和使用Subjects
。
create()
允许我们显式调用观察者的onNext()
| onComplete()
| onError()
方法,因为我们从数据源接收更新。 为了使用create()
,我们传入一个ObservableOnSubscribe
,它在观察者订阅时接收一个ObservableEmitter
。 使用接收到的发射器,我们可以执行所有必要的设置调用以开始接收更新,然后调用适当的Emitter
器事件。
在位置更新的情况下,我们可以注册以在这个地方接收更新并在收到时发出位置更新。
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
create()
调用中的函数请求位置更新并传入一个回调,该回调会在设备的位置更改时被调用。 正如我们在这里看到的,我们基本上替换了回调样式的接口,而是在创建的 Observable 流中发出接收到的位置(为了教育目的,如果你想深入研究,我跳过了构建位置请求的一些细节更深入的细节,你可以在这里阅读)。
关于create()
需要注意的另一件事是,每当调用subscribe()
时,都会提供一个新的发射器。 换句话说, create()
返回一个冷的Observable
。 这意味着,在上面的函数中,我们可能会多次请求位置更新,这不是我们想要的。
为了解决这个问题,我们想在Subjects
的帮助下改变函数以返回一个热的Observable
。
输入主题
Subject
扩展了Observable
并同时实现了Observer
。 每当我们想同时向多个订阅者发出或强制转换相同的事件时,这尤其有用。 在实现方面,我们希望将Subject
作为Observable
公开给客户,同时将其作为提供者的Subject
。
public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }
在这个新的实现中,使用PublishSubject
子类型,它从订阅时间开始在事件到达时发出事件。 因此,如果在已经发出位置更新的点执行订阅,则观察者将不会收到过去的发射,只会收到后续的发射。 如果不需要这种行为,可以使用 RxJava 工具包中的其他几个Subject
子类型。
此外,我们还创建了一个单独的connect()
函数来启动接收位置更新的请求。 observeLocation()
仍然可以进行connect()
调用,但为了清晰/简单,我们将其从函数中重构出来。
概括
我们研究了许多机制和技术:
-
defer()
及其变体将计算的执行延迟到订阅 - 通过
create()
生成的冷Observables
- 使用
Subjects
的热门Observables
- 当我们想要离开响应式世界时,blockingX 操作
希望本文中提供的示例激发了一些关于您的应用程序中可以转换为反应式的不同区域的想法。 我们已经介绍了很多,如果您有任何问题、建议或任何不清楚的地方,请随时在下面发表评论!
如果您有兴趣了解有关 RxJava 的更多信息,我正在编写一本深入的书,该书解释了如何使用 Android 示例以反应式方式查看问题。 如果您想收到有关它的更新,请在此处订阅。