认识 RxJava:Android 缺少的响应式编程库
已发表: 2022-03-11如果您是一名 Android 开发人员,您可能听说过 RxJava。 它是在 Android 开发中启用响应式编程的讨论最多的库之一。 它被吹捧为简化移动编程中固有的并发/异步任务的首选框架。
但是……什么是 RxJava,它是如何“简化”事情的?
虽然网上已经有很多资源可以解释 RxJava 是什么,但在本文中,我的目标是向您介绍 RxJava 的基本介绍,特别是它如何适合 Android 开发。 我还将提供一些具体示例和建议,说明如何将其集成到新项目或现有项目中。
为什么要考虑 RxJava?
RxJava 的核心是简化开发,因为它提高了线程的抽象级别。 也就是说,作为开发人员,您不必过多担心如何执行应该在不同线程上发生的操作的细节。 这特别有吸引力,因为线程很难正确实现,如果没有正确实现,可能会导致一些最难调试和修复的错误。
当然,这并不意味着 RxJava 在线程方面是万无一失的,了解幕后发生的事情仍然很重要。 但是,RxJava 绝对可以让您的生活更轻松。
让我们看一个例子。
网络调用 - RxJava 与 AsyncTask
假设我们想通过网络获取数据并因此更新 UI。 一种方法是(1)在我们的Activity
/ Fragment
中创建一个内部AsyncTask
子类,(2)在后台执行网络操作,以及(3)获取该操作的结果并在主线程中更新 UI .
public class NetworkRequestTask extends AsyncTask<Void, Void, User> { private final int userId; public NetworkRequestTask(int userId) { this.userId = userId; } @Override protected User doInBackground(Void... params) { return networkService.getUser(userId); } @Override protected void onPostExecute(User user) { nameTextView.setText(user.getName()); // ...set other views } } private void onButtonClicked(Button button) { new NetworkRequestTask(123).execute() }
尽管这看起来无害,但这种方法存在一些问题和局限性。 也就是说,由于NetworkRequestTask
是一个内部类,因此很容易创建内存/上下文泄漏,因此它包含对外部类的隐式引用。 另外,如果我们想在网络调用之后链接另一个长操作怎么办? 我们必须嵌套两个AsyncTask
,这会显着降低可读性。
相比之下,执行网络调用的 RxJava 方法可能看起来像这样:
private Subscription subscription; private void onButtonClicked(Button button) { subscription = networkService.getObservableUser(123) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<User>() { @Override public void call(User user) { nameTextView.setText(user.getName()); // ... set other views } }); } @Override protected void onDestroy() { if (subscription != null && !subscription.isUnsubscribed()) { subscription.unsubscribe(); } super.onDestroy(); }
使用这种方法,我们通过保留对返回的Subscription
对象的引用来解决问题(由正在运行的线程持有对外部上下文的引用引起的潜在内存泄漏)。 然后这个Subscription
对象与Activity
/ Fragment
对象的#onDestroy()
方法绑定,以保证在需要销毁Activity
/ Fragment
时不执行Action1#call
操作。
另外,请注意#getObservableUser(...)
的返回类型(即Observable<User>
)与对它的进一步调用链接在一起。 通过这个流畅的 API,我们能够解决使用AsyncTask
的第二个问题,即它允许进一步的网络调用/长操作链。 很整洁吧?
让我们深入了解一些 RxJava 概念。
Observable、Observer 和 Operator - RxJava 核心的 3 个 O
在 RxJava 世界中,一切都可以建模为流。 流随着时间的推移发射项目,并且每个发射都可以被消费/观察。
如果你仔细想想,流并不是一个新概念:点击事件可以是流,位置更新可以是流,推送通知可以是流,等等。
流抽象是通过 3 个我喜欢称之为“3 O”的核心结构实现的; 即: O bservable、 O bserver 和O算子。 Observable发出项目(流); 观察者消耗这些物品。 来自 Observable 对象的发射可以通过链接Operator调用来进一步修改、转换和操作。
可观察的
Observable 是 RxJava 中的流抽象。 它与迭代器相似,给定一个序列,它以有序的方式迭代并生成这些项目。 然后,消费者可以通过相同的接口消费这些项目,而不管底层顺序如何。
假设我们想按顺序发出数字 1、2、3。 为此,我们可以使用Observable<T>#create(OnSubscribe<T>)
方法。
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); subscriber.onCompleted(); } });
调用subscriber.onNext(Integer)
会在流中发出一个项目,当流完成发出后,就会调用subscriber.onCompleted()
。
这种创建 Observable 的方法相当冗长。 出于这个原因,有一些方便的方法可以创建 Observable 实例,几乎在所有情况下都应该首选这些方法。
创建 Observable 的最简单方法是使用Observable#just(...)
。 正如方法名称所暗示的,它只是发出作为方法参数传递给它的项目。
Observable.just(1, 2, 3); // 1, 2, 3 will be emitted, respectively
观察者
Observable 流的下一个组件是订阅它的观察者(或多个观察者)。 每当流中发生“有趣”的事情时,都会通知观察者。 通过以下事件通知观察者:
-
Observer#onNext(T)
- 从流中发出项目时调用 Observable#onError(Throwable)
- 在流中发生错误时调用Observable#onCompleted()
- 当流完成发射项目时调用。
要订阅流,只需调用Observable<T>#subscribe(...)
并传入一个 Observer 实例。
Observable<Integer> observable = Observable.just(1, 2, 3); observable.subscribe(new Observer<Integer>() { @Override public void onCompleted() { Log.d("Test", "In onCompleted()"); } @Override public void onError(Throwable e) { Log.d("Test", "In onError()"); } @Override public void onNext(Integer integer) { Log.d("Test", "In onNext():" + integer); } });
上面的代码将在 Logcat 中发出以下内容:
In onNext(): 1 In onNext(): 2 In onNext(): 3 In onNext(): 4 In onCompleted()
在某些情况下,我们可能不再对 Observable 的发射感兴趣。 这在 Android 中尤其重要,例如,当需要在内存中回收Activity
/ Fragment
时。
要停止观察项目,我们只需要在返回的 Subscription 对象上调用Subscription#unsubscribe()
即可。
Subscription subscription = someInfiniteObservable.subscribe(new Observer<Integer>() { @Override public void onCompleted() { // ... } @Override public void onError(Throwable e) { // ... } @Override public void onNext(Integer integer) { // ... } }); // Call unsubscribe when appropriate subscription.unsubscribe();
如上面的代码片段所示,在订阅 Observable 时,我们持有对返回的 Subscription 对象的引用,然后在必要时调用subscription#unsubscribe()
。 在 Android 中,最好在Activity#onDestroy()
或Fragment#onDestroy()
中调用。
操作员
在通知订阅的 Observer 对象之前,可以通过 Operator 转换、修改和过滤 Observable 发出的项目。 在函数式编程中发现的一些最常见的操作(例如 map、filter、reduce 等)也可以应用于 Observable 流。 我们以地图为例:
Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { return integer * 3; } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { // ... } @Override public void onError(Throwable e) { // ... } @Override public void onNext(Integer integer) { // ... } });
上面的代码片段将从 Observable 中获取每个发射,并将每个发射乘以 3,分别生成流 3、6、9、12、15。 应用运算符通常会返回另一个 Observable 作为结果,这很方便,因为这允许我们链接多个操作以获得所需的结果。
鉴于上面的流,假设我们只想接收偶数。 这可以通过链接过滤器操作来实现。
Observable.just(1, 2, 3, 4, 5).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer integer) { return integer * 3; } }).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; } }).subscribe(new Observer<Integer>() { @Override public void onCompleted() { // ... } @Override public void onError(Throwable e) { // ... } @Override public void onNext(Integer integer) { // ... } });
RxJava 工具集中有许多操作符可以修改 Observable 流; 如果您能想到一种修改流的方法,那么很有可能,它有一个 Operator。 与大多数技术文档不同,阅读 RxJava/ReactiveX 文档非常简单且中肯。 文档中的每个运算符都附带了有关运算符如何影响流的可视化。 这些可视化被称为“大理石图”。
下面是一个名为 Flip 的假设运算符如何通过弹珠图建模:
使用 RxJava 进行多线程
通过指定操作符应该在其中发生的调度程序来控制 Observable 链中操作发生的线程。 从本质上讲,您可以将调度程序视为一个线程池,当指定时,操作员将使用并在其上运行。 默认情况下,如果没有提供这样的调度器,则 Observable 链将在调用Observable#subscribe(...)
的同一线程上运行。 否则,可以通过Observable#subscribeOn(Scheduler)
和/或Observable#observeOn(Scheduler)
指定调度程序,其中调度的操作将发生在调度程序选择的线程上。
这两种方法的主要区别在于Observable#subscribeOn(Scheduler)
指示源 Observable 它应该在哪个调度程序上运行。 该链将继续在Observable#subscribeOn(Scheduler)
中指定的调度程序的线程上运行,直到使用不同的调度程序调用Observable#observeOn(Scheduler)
。 当进行这样的调用时,从那里开始的所有观察者(即链下的后续操作)将在从observeOn
调度程序获取的线程中接收通知。
这是一个大理石图,演示了这些方法如何影响操作的运行位置:
在 Android 的上下文中,如果 UI 操作因长时间操作而需要发生,我们希望该操作发生在 UI 线程上。 为此,我们可以使用AndroidScheduler#mainThread()
,这是 RxAndroid 库中提供的调度程序之一。
Android 上的 RxJava
现在我们已经掌握了一些基础知识,您可能想知道 — 将 RxJava 集成到 Android 应用程序中的最佳方式是什么? 正如您可能想象的那样,RxJava 有很多用例,但在这个例子中,让我们看一个特定的案例:使用 Observable 对象作为网络堆栈的一部分。
在这个例子中,我们将看看 Retrofit,一个由 Square 开源的 HTTP 客户端,它内置了与 RxJava 的绑定,可以与 GitHub 的 API 交互。 具体来说,我们将创建一个简单的应用程序,为给定 GitHub 用户名的用户显示所有已加星标的存储库。 如果您想继续前进,可以在此处获得源代码。
创建一个新的 Android 项目
- 首先创建一个新的 Android 项目并将其命名为GitHubRxJava 。
- 在Target Android Devices屏幕中,保持选中Phone and Tablet并将最低 SDK 级别设置为 17。随意将其设置为更低/更高的 API 级别,但对于本示例,API 级别 17 就足够了。
- 在下一个提示中选择Empty Activity 。
- 在最后一步中,保持 Activity Name 为MainActivity并生成一个布局文件activity_main 。
项目设置
在app/build.gradle
中包含 RxJava、RxAndroid 和 Retrofit 库。 请注意,包含 RxAndroid 隐式也包含 RxJava。 然而,最好总是明确地包含这两个库,因为 RxAndroid 并不总是包含最新版本的 RxJava。 明确包含最新版本的 RxJava 保证使用最新版本。
dependencies { compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0' compile 'com.squareup.retrofit2:converter-gson:2.1.0' compile 'com.squareup.retrofit2:retrofit:2.1.0' compile 'io.reactivex:rxandroid:1.2.0' compile 'io.reactivex:rxjava:1.1.8' // ...other dependencies }
创建数据对象
创建GitHubRepo
数据对象类。 此类封装了 GitHub 中的存储库(网络响应包含更多数据,但我们只对其中的一个子集感兴趣)。

public class GitHubRepo { public final int id; public final String name; public final String htmlUrl; public final String description; public final String language; public final int stargazersCount; public GitHubRepo(int id, String name, String htmlUrl, String description, String language, int stargazersCount) { this.id = id; this.name = name; this.htmlUrl = htmlUrl; this.description = description; this.language = language; this.stargazersCount = stargazersCount; } }
设置改造
- 创建
GitHubService
接口。 我们将把这个接口传递给 Retrofit 并且 Retrofit 将创建一个GitHubService
的实现。
public interface GitHubService { @GET("users/{user}/starred") Observable<List<GitHubRepo>> getStarredRepositories(@Path("user") String userName); }
创建
GitHubClient
类。 这将是我们将与之交互以从 UI 级别进行网络调用的对象。在通过 Retrofit 构建
GitHubService
的实现时,我们需要传入一个RxJavaCallAdapterFactory
作为调用适配器,以便网络调用可以返回 Observable 对象(任何返回Call
以外的结果的网络调用都需要传递调用适配器)。我们还需要传入一个
GsonConverterFactory
以便我们可以使用 Gson 作为将 JSON 对象编组为 Java 对象的一种方式。
public class GitHubClient { private static final String GITHUB_BASE_URL = "https://api.github.com/"; private static GitHubClient instance; private GitHubService gitHubService; private GitHubClient() { final Gson gson = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).create(); final Retrofit retrofit = new Retrofit.Builder().baseUrl(GITHUB_BASE_URL) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .addConverterFactory(GsonConverterFactory.create(gson)) .build(); gitHubService = retrofit.create(GitHubService.class); } public static GitHubClient getInstance() { if (instance == null) { instance = new GitHubClient(); } return instance; } public Observable<List<GitHubRepo>> getStarredRepos(@NonNull String userName) { return gitHubService.getStarredRepositories(userName); } }
设置布局
接下来,创建一个简单的 UI,在给定输入 GitHub 用户名的情况下显示检索到的存储库。 创建activity_home.xml
- 我们活动的布局 - 如下所示:
<?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:andro android:layout_width="match_parent" android:layout_height="match_parent" android:orientation="vertical"> <ListView android: android:layout_width="match_parent" android:layout_height="0dp" android:layout_weight="1"/> <LinearLayout android:layout_width="match_parent" android:layout_height="wrap_content" android:orientation="horizontal"> <EditText android: android:layout_width="0dp" android:layout_height="wrap_content" android:layout_weight="1" android:hint="@string/username"/> <Button android: android:layout_width="wrap_content" android:layout_height="wrap_content" android:text="@string/search"/> </LinearLayout> </LinearLayout>
创建item_github_repo.xml
- GitHub 存储库对象的ListView
项目布局 - 使用如下内容:
<?xml version="1.0" encoding="utf-8"?> <RelativeLayout xmlns:andro xmlns:tools="http://schemas.android.com/tools" android:orientation="vertical" android:layout_width="match_parent" android:layout_height="match_parent" android:padding="6dp"> <TextView android: android:layout_width="match_parent" android:layout_height="wrap_content" android:textSize="24sp" android:text tools:text="Cropper"/> <TextView android: android:layout_width="match_parent" android:layout_height="wrap_content" android:lines="2" android:ellipsize="end" android:textSize="16sp" android:layout_below="@+id/text_repo_name" tools:text="Android widget for cropping and rotating an image."/> <TextView android: android:layout_width="wrap_content" android:layout_height="wrap_content" android:layout_below="@+id/text_repo_description" android:layout_alignParentLeft="true" android:textColor="?attr/colorPrimary" android:textSize="14sp" android:text tools:text="Language: Java"/> <TextView android: android:layout_width="wrap_content" android:layout_height="wrap_content" android:layout_below="@+id/text_repo_description" android:layout_alignParentRight="true" android:textColor="?attr/colorAccent" android:textSize="14sp" android:text tools:text="Stars: 1953"/> </RelativeLayout>
把所有东西粘在一起
创建一个ListAdapter
,负责将GitHubRepo
对象绑定到ListView
项中。 如果没有提供回收的View
,该过程本质上涉及将item_github_repo.xml
膨胀到View
中; 否则,将重复使用回收的View
以防止过度膨胀过多的View
对象。
public class GitHubRepoAdapter extends BaseAdapter { private List<GitHubRepo> gitHubRepos = new ArrayList<>(); @Override public int getCount() { return gitHubRepos.size(); } @Override public GitHubRepo getItem(int position) { if (position < 0 || position >= gitHubRepos.size()) { return null; } else { return gitHubRepos.get(position); } } @Override public long getItemId(int position) { return position; } @Override public View getView(int position, View convertView, ViewGroup parent) { final View view = (convertView != null ? convertView : createView(parent)); final GitHubRepoViewHolder viewHolder = (GitHubRepoViewHolder) view.getTag(); viewHolder.setGitHubRepo(getItem(position)); return view; } public void setGitHubRepos(@Nullable List<GitHubRepo> repos) { if (repos == null) { return; } gitHubRepos.clear(); gitHubRepos.addAll(repos); notifyDataSetChanged(); } private View createView(ViewGroup parent) { final LayoutInflater inflater = LayoutInflater.from(parent.getContext()); final View view = inflater.inflate(R.layout.item_github_repo, parent, false); final GitHubRepoViewHolder viewHolder = new GitHubRepoViewHolder(view); view.setTag(viewHolder); return view; } private static class GitHubRepoViewHolder { private TextView textRepoName; private TextView textRepoDescription; private TextView textLanguage; private TextView textStars; public GitHubRepoViewHolder(View view) { textRepoName = (TextView) view.findViewById(R.id.text_repo_name); textRepoDescription = (TextView) view.findViewById(R.id.text_repo_description); textLanguage = (TextView) view.findViewById(R.id.text_language); textStars = (TextView) view.findViewById(R.id.text_stars); } public void setGitHubRepo(GitHubRepo gitHubRepo) { textRepoName.setText(gitHubRepo.name); textRepoDescription.setText(gitHubRepo.description); textLanguage.setText("Language: " + gitHubRepo.language); textStars.setText("Stars: " + gitHubRepo.stargazersCount); } } }
在MainActivity
中将所有内容粘合在一起。 这本质上是我们第一次启动应用程序时显示的Activity
。 在这里,我们要求用户输入他们的 GitHub 用户名,最后,显示该用户名的所有已加星标的存储库。
public class MainActivity extends AppCompatActivity { private static final String TAG = MainActivity.class.getSimpleName(); private GitHubRepoAdapter adapter = new GitHubRepoAdapter(); private Subscription subscription; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); final ListView listView = (ListView) findViewById(R.id.list_view_repos); listView.setAdapter(adapter); final EditText editTextUsername = (EditText) findViewById(R.id.edit_text_username); final Button buttonSearch = (Button) findViewById(R.id.button_search); buttonSearch.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { final String username = editTextUsername.getText().toString(); if (!TextUtils.isEmpty(username)) { getStarredRepos(username); } } }); } @Override protected void onDestroy() { if (subscription != null && !subscription.isUnsubscribed()) { subscription.unsubscribe(); } super.onDestroy(); } private void getStarredRepos(String username) { subscription = GitHubClient.getInstance() .getStarredRepos(username) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<GitHubRepo>>() { @Override public void onCompleted() { Log.d(TAG, "In onCompleted()"); } @Override public void onError(Throwable e) { e.printStackTrace(); Log.d(TAG, "In onError()"); } @Override public void onNext(List<GitHubRepo> gitHubRepos) { Log.d(TAG, "In onNext()"); adapter.setGitHubRepos(gitHubRepos); } }); } }
运行应用程序
运行应用程序应该会显示一个带有输入框的屏幕,用于输入 GitHub 用户名。 然后搜索应显示所有已加星标的存储库的列表。
结论
我希望这是对 RxJava 的有用介绍和对其基本功能的概述。 RxJava 中有大量强大的概念,我强烈建议您通过更深入地研究记录良好的 RxJava wiki 来探索它们。
随时在下面的评论框中留下任何问题或评论。 你也可以在 Twitter 上关注我@arriolachris,在那里我发布了很多关于 RxJava 和 Android 的所有东西。
如果你想要一个关于 RxJava 的综合学习资源,你可以查看我在 Leanpub 上和 Angus Huang 写的电子书。