Как упростить параллелизм с помощью реактивного моделирования на Android
Опубликовано: 2022-03-11Параллелизм и асинхронность присущи мобильному программированию.
Работа с параллелизмом с помощью императивного программирования, который обычно включает в себя программирование на Android, может быть причиной многих проблем. Используя реактивное программирование с RxJava, вы можете избежать потенциальных проблем параллелизма, предоставив более чистое и менее подверженное ошибкам решение.
Помимо упрощения параллельных асинхронных задач, RxJava также предоставляет возможность выполнять операции в функциональном стиле, которые преобразуют, комбинируют и агрегируют выбросы из Observable до тех пор, пока мы не достигнем желаемого результата.
Комбинируя реактивную парадигму RxJava и операции функционального стиля, мы можем реактивным образом моделировать широкий спектр конструкций параллелизма даже в нереактивном мире Android. В этой статье вы узнаете, как именно это можно сделать. Вы также узнаете, как постепенно внедрить RxJava в существующий проект.
Если вы новичок в RxJava, я рекомендую прочитать этот пост, в котором рассказывается о некоторых основах RxJava.
Мост нереактивного в реактивный мир
Одна из проблем добавления RxJava в качестве одной из библиотек в ваш проект заключается в том, что это коренным образом меняет то, как вы рассуждаете о своем коде.
RxJava требует, чтобы вы думали о том, что данные передаются, а не извлекаются. Хотя сама концепция проста, изменение всей кодовой базы, основанной на парадигме вытягивания, может быть немного сложной задачей. Хотя согласованность всегда идеальна, у вас не всегда может быть привилегия сделать этот переход сразу во всей базе кода, поэтому может потребоваться более поэтапный подход.
Рассмотрим следующий код:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
Эта функция получает список объектов User
из кэша, фильтрует каждый из них в зависимости от того, есть ли у пользователя блог, сортирует их по имени пользователя и, наконец, возвращает их вызывающему объекту. Глядя на этот фрагмент, мы замечаем, что многие из этих операций могут использовать операторы RxJava; например, filter()
и sorted()
.
Переписав этот фрагмент, мы получаем:
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Первая строка функции преобразует List<User>
, возвращенный UserCache.getAllUsers()
, в Observable<User>
через fromIterable()
. Это первый шаг к тому, чтобы сделать наш код реактивным. Теперь, когда мы работаем с Observable
, это позволяет нам выполнять любой оператор Observable
из инструментария RxJava — в данном случае filter()
и sorted sorted()
.
Есть еще несколько моментов, на которые следует обратить внимание в связи с этим изменением.
Во-первых, сигнатура метода уже не та. Это может не иметь большого значения, если этот вызов метода используется только в нескольких местах и легко распространить изменения на другие области стека; однако, если клиенты, полагающиеся на этот метод, ломаются, это проблематично, и сигнатуру метода следует отменить.
Во-вторых, RxJava разработан с учетом лени. То есть никаких долгих операций выполнять, когда на Observable
нет подписчиков. С этой модификацией это предположение больше не соответствует действительности, поскольку UserCache.getAllUsers()
вызывается еще до того, как появятся какие-либо подписчики.
Выход из реактивного мира
Чтобы решить первую проблему нашего изменения, мы можем использовать любой из операторов блокировки, доступных для Observable
, таких как blockingFirst()
и blockingNext()
. По сути, оба этих оператора будут блокироваться до тех пор, пока элемент не будет передан ниже по течению: blockingFirst()
вернет первый выпущенный и завершенный элемент, тогда как blockingNext()
вернет Iterable
, который позволяет вам выполнять цикл for-each для базовых данных ( каждая итерация цикла будет блокироваться).
Побочным эффектом использования операции блокировки, о котором важно знать, является то, что исключения генерируются в вызывающем потоке, а не передаются методу onError()
наблюдателя.
Используя блокирующий оператор, чтобы изменить сигнатуру метода обратно на List<User>
, наш фрагмент теперь будет выглядеть так:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
Перед вызовом оператора блокировки (например, blockingGet()
) нам сначала нужно связать агрегатный оператор toList()
, чтобы поток изменился с Observable<User>
на Single<List<User>>
( Single
— это специальный тип Observable
, который выдает только одно значение в onSuccess()
или ошибку через onError()
).
После этого мы можем вызвать блокирующий оператор blockingGet()
, который разворачивает Single
и возвращает List<User>
.
Хотя RxJava поддерживает это, этого следует избегать, насколько это возможно, поскольку это не идиоматическое реактивное программирование. Однако, когда это абсолютно необходимо, блокирующие операторы — хороший начальный способ выйти из реактивного мира.
Ленивый подход
Как упоминалось ранее, RxJava был разработан с учетом лени. То есть длительные операции должны быть отложены как можно дольше (т. е. до тех пор, пока подписка не будет вызвана для Observable
). Чтобы сделать наше решение ленивым, мы используем оператор defer()
.
defer()
принимает фабрику ObservableSource
, которая создает Observable
для каждого нового подписавшегося наблюдателя. В нашем случае мы хотим возвращать Observable.fromIterable(UserCache.getAllUser())
всякий раз, когда наблюдатель подписывается.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Теперь, когда длительная операция заключена в defer()
, у нас есть полный контроль над тем, в каком потоке она должна выполняться, просто указав соответствующий Scheduler
в subscribeOn()
. С этим изменением наш код полностью реактивен, и подписка должна происходить только в тот момент, когда нужны данные.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
Другим весьма полезным оператором для отсрочки вычислений является метод fromCallable()
. В отличие от defer()
, который ожидает, что Observable
будет возвращен в лямбда-функции и, в свою очередь, «выравнивает» возвращенный Observable
, fromCallable()
вызовет лямбда-выражение и вернет значение ниже по течению.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Однократное использование fromCallable()
в списке теперь будет возвращать Observable<List<User>>
, нам нужно сгладить этот список с помощью flatMap()
.
Реактивно-все
Из предыдущих примеров мы видели, что мы можем обернуть любой объект в Observable
и переключаться между нереактивным и реактивным состояниями, используя блокирующие операции и defer( defer()
/ fromCallable()
. Используя эти конструкции, мы можем начать преобразовывать области приложения Android в реактивные.
Долгие операции
Хорошим местом для первоначального рассмотрения использования RxJava является всякий раз, когда у вас есть процесс, который требует времени для выполнения, например, сетевые вызовы (см. предыдущий пост для примеров), чтение и запись на диск и т. д. Следующий пример иллюстрирует простую функцию, которая запишет текст в файловую систему:
/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
При вызове этой функции нам нужно убедиться, что она выполняется в отдельном потоке, поскольку эта операция блокируется. Наложение такого ограничения на вызывающую программу усложняет задачу разработчику, что увеличивает вероятность ошибок и потенциально может замедлить разработку.

Добавление комментария к функции, конечно, поможет избежать ошибок вызывающей стороны, но это все еще далеко от надежной защиты.
Однако, используя RxJava, мы можем легко обернуть это в Observable
и указать Scheduler
, на котором он должен работать. Таким образом, вызывающей стороне вообще не нужно беспокоиться о вызове функции в отдельном потоке; функция позаботится об этом сама.
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
Используя fromCallable()
, запись текста в файл откладывается до времени подписки.
Поскольку исключения являются первоклассными объектами в RxJava, еще одно преимущество нашего изменения заключается в том, что нам больше не нужно заключать операцию в блок try/catch. Исключение будет просто распространяться вниз по течению, а не поглощаться. Это позволяет вызывающему объекту обрабатывать исключение, которое он/она считает нужным (например, показывать пользователю ошибку в зависимости от того, какое исключение было сгенерировано и т. д.).
Еще одна оптимизация, которую мы можем выполнить, — вернуть Completable
, а не Observable
. Completable
— это, по сути, особый тип Observable
— аналогичный Single
— который просто указывает, успешно ли выполнено вычисление, с помощью onComplete()
, или не удалось, с помощью onError()
. Возврат Completable
в этом случае имеет больше смысла, поскольку глупо возвращать одно значение true в потоке Observable
.
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
Для завершения операции мы используем fromAction()
Completable
, так как возвращаемое значение нас больше не интересует. При необходимости, как и Observable
, Completable
также поддерживает функции fromCallable()
и defer( defer()
.
Замена обратных вызовов
До сих пор все примеры, которые мы рассматривали, выдавали либо одно значение (т. е. могли быть смоделированы как Single
), либо сообщали нам, была ли операция успешной или неудачной (т. е. могли быть смоделированы как Completable
).
Однако как мы можем преобразовать области в нашем приложении, которые получают постоянные обновления или события (такие как обновления местоположения, события просмотра кликов, события датчиков и т. д.)?
Мы рассмотрим два способа сделать это: с помощью create()
и с помощью Subjects
.
create()
позволяет нам явно вызывать метод наблюдателя onNext()
| onComplete()
| onError()
, когда мы получаем обновления из нашего источника данных. Чтобы использовать create()
, мы передаем ObservableOnSubscribe
, который получает ObservableEmitter
всякий раз, когда наблюдатель подписывается. Используя полученный эмиттер, мы можем затем выполнить все необходимые вызовы настройки, чтобы начать получать обновления, а затем вызвать соответствующее событие Emitter
.
В случае обновлений местоположения мы можем зарегистрироваться для получения обновлений в этом месте и отправлять обновления местоположения по мере их получения.
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
Функция внутри вызова create()
запрашивает обновления местоположения и передает обратный вызов, который вызывается при изменении местоположения устройства. Как мы видим здесь, мы по существу заменяем интерфейс в стиле обратного вызова и вместо этого выдаем полученное местоположение в созданный поток Observable (ради образовательных целей я пропустил некоторые детали с построением запроса местоположения, если вы хотите вникать более подробно вы можете прочитать здесь).
Еще одна вещь, которую следует отметить в отношении create()
, заключается в том, что всякий раз, когда вызывается subscribe()
, предоставляется новый эмиттер. Другими словами, create()
возвращает холодный Observable
. Это означает, что в приведенной выше функции мы потенциально будем запрашивать обновления местоположения несколько раз, а это не то, что нам нужно.
Чтобы обойти это, мы хотим изменить функцию, чтобы она возвращала горячий Observable
с помощью Subjects
.
Введите темы
Subject
расширяет Observable
и одновременно реализует Observer
. Это особенно полезно, когда мы хотим отправить или передать одно и то же событие нескольким подписчикам одновременно. С точки зрения реализации мы хотели бы предоставить Subject
как Observable
для клиентов, сохраняя при этом его как Subject
для провайдера.
public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }
В этой новой реализации используется подтип PublishSubject
, который генерирует события по мере их поступления, начиная с момента подписки. Соответственно, если подписка выполняется в момент, когда уже были отправлены обновления местоположения, прошлые выбросы не будут получены наблюдателем, а только последующие. Если такое поведение нежелательно, в наборе инструментов RxJava есть несколько других подтипов Subject
, которые можно использовать.
Кроме того, мы также создали отдельную функцию connect()
, которая запускает запрос на получение обновлений местоположения. observeLocation()
по-прежнему может выполнять вызов connect()
, но мы убрали его из функции для ясности/простоты.
Резюме
Мы рассмотрели ряд механизмов и методов:
-
defer()
и его варианты для задержки выполнения вычислений до подписки. - холодные
Observables
, сгенерированные с помощьюcreate()
- горячие
Observables
с использованиемSubjects
- блокировка операций X, когда мы хотим покинуть реактивный мир
Надеемся, что приведенные в этой статье примеры вдохновили вас на некоторые идеи относительно различных областей вашего приложения, которые можно преобразовать в реактивные. Мы рассмотрели многое, и если у вас есть какие-либо вопросы, предложения или если что-то непонятно, не стесняйтесь оставлять комментарии ниже!
Если вам интересно узнать больше о RxJava, я работаю над подробной книгой, в которой объясняется, как рассматривать проблемы реактивным способом на примерах Android. Если вы хотите получать обновления о нем, пожалуйста, подпишитесь здесь.