Androidでリアクティブモデリングを使用して同時実行を簡素化する方法
公開: 2022-03-11並行性と非同期性は、モバイルプログラミングに固有のものです。
Androidでのプログラミングに一般的に含まれる命令型プログラミングを介して並行性に対処することは、多くの問題の原因となる可能性があります。 RxJavaでリアクティブプログラミングを使用すると、よりクリーンでエラーが発生しにくいソリューションを提供することで、潜在的な同時実行の問題を回避できます。
RxJavaは、同時の非同期タスクを単純化するだけでなく、目的の結果が得られるまでObservableからの排出量を変換、結合、および集約する機能スタイルの操作を実行する機能も提供します。
RxJavaのリアクティブパラダイムと機能スタイルの操作を組み合わせることで、Androidの非リアクティブな世界でも、リアクティブな方法でさまざまな同時実行構造をモデル化できます。 この記事では、まさにそれを行う方法を学びます。 また、RxJavaを既存のプロジェクトに段階的に採用する方法についても学びます。
RxJavaを初めて使用する場合は、RxJavaの基本について説明している投稿をここで読むことをお勧めします。
非反応性を反応性の世界に橋渡しする
RxJavaをライブラリの1つとしてプロジェクトに追加する際の課題の1つは、コードについての推論方法が根本的に変わることです。
RxJavaでは、データをプルするのではなくプッシュするものとして考える必要があります。 概念自体は単純ですが、プルパラダイムに基づく完全なコードベースを変更することは少し困難な場合があります。 一貫性は常に理想的ですが、コードベース全体でこの移行を一度に行う特権が常にあるとは限らないため、より段階的なアプローチが必要になる場合があります。
次のコードを検討してください。
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
この関数は、キャッシュからUser
オブジェクトのリストを取得し、ユーザーがブログを持っているかどうかに基づいて各オブジェクトをフィルタリングし、ユーザーの名前で並べ替えて、最後に呼び出し元に返します。 このスニペットを見ると、これらの操作の多くがRxJava演算子を利用できることがわかります。 例: filter()
およびsorted()
。
このスニペットを書き直すと、次のようになります。
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
関数の最初の行は、 UserCache.getAllUsers()
によって返されたList<User>
>をfromIterable()
を介してObservable<User>
に変換します。 これは、コードをリアクティブにするための最初のステップです。 Observable
を操作しているので、これにより、RxJavaツールキットで任意のObservable
オペレーター(この場合はfilter()
およびsorted()
を実行できます。
この変更について注意すべき点が他にもいくつかあります。
まず、メソッドのシグネチャは同じではなくなりました。 このメソッド呼び出しが少数の場所でのみ使用され、スタックの他の領域に変更を伝播するのが簡単な場合、これは大したことではないかもしれません。 ただし、このメソッドに依存しているクライアントが破損した場合、それは問題であり、メソッドのシグネチャを元に戻す必要があります。
第二に、RxJavaは怠惰を念頭に置いて設計されています。 つまり、 Observable
のサブスクライバーがいない場合は、長い操作を実行しないでください。 この変更により、サブスクライバーが存在する前でもUserCache.getAllUsers()
が呼び出されるため、この仮定は当てはまりません。
リアクティブな世界を離れる
変更による最初の問題に対処するために、 blockingFirst()
やblockingNext()
)などのObservable
で使用可能な任意のブロッキング演算子を利用できます。 基本的に、これらの演算子は両方とも、アイテムがダウンストリームに放出されるまでブロックします。blockingFirst( blockingFirst()
は放出された最初の要素を返し、終了します。一方、 blockingNext()
は、基になるデータに対してfor-eachループを実行できるIterable
を返します(ループの各反復はブロックされます)。
ただし、注意が重要なブロッキング操作を使用することの副作用は、オブザーバーのonError()
メソッドに渡されるのではなく、呼び出し元のスレッドで例外がスローされることです。
ブロッキング演算子を使用してメソッドシグネチャをList List<User>
に戻すと、スニペットは次のようになります。
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
ブロッキング演算子(つまりblockingGet()
)を呼び出す前に、まず集約演算子toList()
をチェーンして、ストリームがObservable<User>
からSingle<List<User>>
に変更されるようにする必要があります( Single
は特殊なタイプです)。 onSuccess()
で単一の値のみを出力するObservable
の、またはonError()
を介したエラー)。
その後、ブロッキング演算子blockingGet()
を呼び出して、 Single
をアンラップし、 List<User>
を返すことができます。
RxJavaはこれをサポートしていますが、これは慣用的なリアクティブプログラミングではないため、可能な限り回避する必要があります。 ただし、どうしても必要な場合は、ブロッキング演算子は、リアクティブな世界から抜け出すための優れた最初の方法です。
怠惰なアプローチ
前述のように、RxJavaは怠惰を念頭に置いて設計されました。 つまり、長時間実行される操作は、可能な限り遅らせる必要があります(つまり、サブスクライブがObservable
で呼び出されるまで)。 ソリューションを怠惰にするために、 defer()
演算子を使用します。
defer()
は、サブスクライブする新しいオブザーバーごとにObservable
を作成するObservableSource
ファクトリを取り込みます。 この例では、オブザーバーがサブスクライブするたびにObservable.fromIterable(UserCache.getAllUser())
を返します。
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
長時間実行される操作がdefer()
でラップされたので、 subscribeOn()
で適切なScheduler
を指定するだけで、これを実行するスレッドを完全に制御できます。 この変更により、コードは完全にリアクティブになり、サブスクリプションはデータが必要になったときにのみ発生するはずです。
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
計算を延期するためのもう1つの非常に便利な演算子は、 fromCallable()
メソッドです。 Observable
がラムダ関数で返されることを期待し、次に返されたObservable
を「フラット化」 defer()
とは異なり、 fromCallable()
はラムダを呼び出して値をダウンストリームに返します。
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
リストでfromCallable()
を使用すると、 Observable<List<User>>
が返されるようになりましたflatMap()
を使用してこのリストをフラット化する必要があります。
反応性-すべて
前の例から、任意のオブジェクトをObservable
でラップし、ブロッキング操作とdefer()
/ fromCallable()
を使用して非リアクティブ状態とリアクティブ状態の間をジャンプできることを確認しました。 これらの構成を使用して、Androidアプリの領域をリアクティブに変換し始めることができます。
長時間の操作
RxJavaの使用を最初に検討するのに適した場所は、ネットワーク呼び出し(例については前の投稿を確認)、ディスクの読み取りと書き込みなど、実行に時間がかかるプロセスがある場合です。次の例は、次のような単純な関数を示しています。ファイルシステムにテキストを書き込みます。
/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
この関数を呼び出すときは、この操作がブロックされているため、別のスレッドで実行されていることを確認する必要があります。 呼び出し元にこのような制限を課すと、開発者にとって事態が複雑になり、バグの可能性が高まり、開発が遅くなる可能性があります。

もちろん、関数にコメントを追加すると、呼び出し元によるエラーを回避するのに役立ちますが、それでも防弾にはほど遠いです。
ただし、RxJavaを使用すると、これをObservable
に簡単にラップして、実行するScheduler
を指定できます。 このように、呼び出し元は別のスレッドで関数を呼び出すことをまったく気にする必要はありません。 関数はこれ自体を処理します。
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
fromCallable()
を使用して、ファイルへのテキストの書き込みはサブスクリプション時まで延期されます。
例外はRxJavaのファーストクラスのオブジェクトであるため、この変更のもう1つの利点は、操作をtry/catchブロックでラップする必要がなくなったことです。 例外は、飲み込まれるのではなく、単にダウンストリームに伝播されます。 これにより、呼び出し元は適切と思われる例外を処理できます(たとえば、スローされた例外に応じてユーザーにエラーを表示するなど)。
実行できるもう1つの最適化は、 Observable
ではなくCompletable
を返すことです。 Completable
は、本質的には特別なタイプのObservable
であり、 Single
と同様に、 onComplete()
を介して計算が成功したか、 onError()
を介して失敗したかを示すだけです。 この場合、 Observable
ストリームで単一のtrueを返すのはばかげているように見えるため、 Completable
を返す方が理にかなっているようです。
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
戻り値は重要ではなくなったため、操作を完了するには、CompletableのCompletable
fromAction()
操作を使用します。 必要に応じて、 Observable
のように、 Completable
はfromCallable()
およびdefer()
関数もサポートします。
コールバックの置き換え
これまで見てきたすべての例は、1つの値(つまり、 Single
としてモデル化できる)、または操作が成功したか失敗したか(つまり、 Completable
としてモデル化できる)のいずれかを示しています。
ただし、継続的な更新またはイベント(位置の更新、クリックイベントの表示、センサーイベントなど)を受信するアプリ内の領域をどのように変換できますか?
create()
とSubjects
を使用して、これを行う2つの方法を見ていきます。
create()
を使用すると、オブザーバーのonNext()
を明示的に呼び出すことができます。 onComplete()
| データソースから更新を受信するときのonError()
メソッド。 create()
を使用するには、オブザーバーがサブスクライブするたびにObservableOnSubscribe
を受け取るObservableEmitter
を渡します。 次に、受信したエミッターを使用して、必要なすべてのセットアップ呼び出しを実行して更新の受信を開始し、適切なEmitter
イベントを呼び出すことができます。
場所の更新の場合、この場所で更新を受信するように登録し、受信した場所の更新を送信できます。
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
create()
呼び出し内の関数は、場所の更新を要求し、デバイスの場所が変更されたときに呼び出されるコールバックを渡します。 ここでわかるように、基本的にコールバックスタイルのインターフェースを置き換え、代わりに作成されたObservableストリームで受信した場所を出力します(教育目的で、詳細を掘り下げたい場合は、場所リクエストの作成で詳細の一部をスキップしました詳細については、こちらで読むことができます)。
create()
についてもう1つ注意すべき点は、 subscribe()
が呼び出されるたびに、新しいエミッターが提供されることです。 つまり、 create()
はコールドObservable
を返します。 これは、上記の関数では、位置の更新を複数回要求する可能性があることを意味しますが、これは私たちが望んでいることではありません。
これを回避するために、 Subjects
の助けを借りてホットObservable
を返すように関数を変更したいと思います。
件名を入力してください
Subject
はObservable
を拡張し、同時にObserver
を実装します。 これは、同じイベントを同時に複数のサブスクライバーに送信またはキャストする場合に特に便利です。 実装に関しては、 Subject
をプロバイダーのサブジェクトとして保持しながら、 Subject
をObservable
としてクライアントに公開する必要があります。
public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }
この新しい実装では、サブタイプPublishSubject
が使用され、サブスクリプションの時点から到着したイベントを発行します。 したがって、位置の更新がすでに発行されている時点でサブスクリプションが実行された場合、過去の発行はオブザーバーによって受信されず、後続の発行のみが受信されます。 この動作が望ましくない場合は、RxJavaツールキットに使用できる他のSubject
サブタイプがいくつかあります。
さらに、場所の更新を受信するリクエストを開始する別のconnect()
関数も作成しました。 observeLocation()
は引き続きconnect()
呼び出しを実行できますが、わかりやすくするために関数からリファクタリングしました。
概要
いくつかのメカニズムと手法を見てきました。
-
defer()
とそのバリアントは、サブスクリプションまで計算の実行を遅らせます create()
を介して生成されたコールドObservables
-
Subjects
を使用したホットObservables
- リアクティブな世界を離れたいときのblockingX操作
うまくいけば、この記事で提供されている例が、リアクティブに変換できるアプリのさまざまな領域に関するいくつかのアイデアに影響を与えました。 私たちはたくさんのことをカバーしました、そしてあなたが質問、提案があるか、何かがはっきりしないならば、下にコメントを残してください!
RxJavaについて詳しく知りたい場合は、Androidの例を使用して問題をリアクティブな方法で表示する方法を説明する詳細な本に取り組んでいます。 最新情報を受け取りたい場合は、こちらから購読してください。