Cara Menyederhanakan Konkurensi dengan Pemodelan Reaktif di Android

Diterbitkan: 2022-03-11

Konkurensi dan asinkronisitas melekat pada pemrograman seluler.

Berurusan dengan konkurensi melalui pemrograman gaya imperatif, yang umumnya melibatkan pemrograman di Android, dapat menjadi penyebab banyak masalah. Menggunakan Pemrograman Reaktif dengan RxJava, Anda dapat menghindari potensi masalah konkurensi dengan menyediakan solusi yang lebih bersih dan tidak rawan kesalahan.

Selain menyederhanakan tugas bersamaan dan asinkron, RxJava juga menyediakan kemampuan untuk melakukan operasi gaya fungsional yang mengubah, menggabungkan, dan mengumpulkan emisi dari Observable hingga kami mencapai hasil yang diinginkan.

Dengan menggabungkan paradigma reaktif RxJava dan operasi gaya fungsional, kita dapat memodelkan berbagai konstruksi konkurensi dengan cara reaktif, bahkan di dunia non-reaktif Android. Pada artikel ini, Anda akan belajar bagaimana Anda dapat melakukan hal itu. Anda juga akan belajar bagaimana mengadopsi RxJava ke dalam proyek yang sudah ada secara bertahap.

Jika Anda baru mengenal RxJava, saya sarankan membaca posting di sini yang berbicara tentang beberapa dasar-dasar RxJava.

Menjembatani Non-Reaktif ke Dunia Reaktif

Salah satu tantangan menambahkan RxJava sebagai salah satu pustaka ke proyek Anda adalah bahwa hal itu secara mendasar mengubah cara Anda bernalar tentang kode Anda.

RxJava mengharuskan Anda untuk menganggap data sebagai yang didorong daripada ditarik. Meskipun konsepnya sendiri sederhana, mengubah basis kode lengkap yang didasarkan pada paradigma tarik bisa menjadi sedikit menakutkan. Meskipun konsistensi selalu ideal, Anda mungkin tidak selalu memiliki hak istimewa untuk melakukan transisi ini di seluruh basis kode Anda sekaligus, jadi mungkin diperlukan lebih banyak pendekatan inkremental.

Perhatikan kode berikut:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

Fungsi ini mendapatkan daftar objek User dari cache, memfilter masing-masing berdasarkan apakah pengguna memiliki blog atau tidak, mengurutkannya berdasarkan nama pengguna, dan akhirnya mengembalikannya ke pemanggil. Melihat cuplikan ini, kami melihat bahwa banyak dari operasi ini dapat memanfaatkan operator RxJava; misalnya, filter() dan sorted() .

Menulis ulang cuplikan ini kemudian memberi kita:

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Baris pertama fungsi mengonversi List<User> yang dikembalikan oleh UserCache.getAllUsers() menjadi Observable<User> melalui fromIterable() . Ini adalah langkah pertama untuk membuat kode kita reaktif. Sekarang kita beroperasi pada Observable , ini memungkinkan kita untuk melakukan operator Observable apa pun di toolkit RxJava – filter() dan sort sorted() dalam kasus ini.

Ada beberapa poin lain yang perlu diperhatikan tentang perubahan ini.

Pertama, tanda tangan metode tidak lagi sama. Ini mungkin bukan masalah besar jika pemanggilan metode ini hanya digunakan di beberapa tempat dan mudah untuk menyebarkan perubahan ke area lain dari tumpukan; namun, jika itu merusak klien yang mengandalkan metode ini, itu bermasalah dan tanda tangan metode harus dikembalikan.

Kedua, RxJava dirancang dengan mempertimbangkan kemalasan. Artinya, tidak ada operasi lama yang harus dilakukan ketika tidak ada pelanggan Observable . Dengan modifikasi ini, asumsi itu tidak lagi benar karena UserCache.getAllUsers() dipanggil bahkan sebelum ada pelanggan.

Meninggalkan Dunia Reaktif

Untuk mengatasi masalah pertama dari perubahan kami, kami dapat menggunakan salah satu operator pemblokiran yang tersedia untuk Observable seperti blockingFirst() dan blockingNext() . Pada dasarnya, kedua operator ini akan memblokir hingga item dipancarkan ke hilir: blockingFirst() akan mengembalikan elemen pertama yang dipancarkan dan selesai, sedangkan blockingNext() akan mengembalikan Iterable yang memungkinkan Anda melakukan perulangan untuk setiap loop pada data yang mendasarinya ( setiap iterasi melalui loop akan memblokir).

Namun, efek samping dari penggunaan operasi pemblokiran yang penting untuk diperhatikan adalah bahwa pengecualian dilemparkan ke utas panggilan alih-alih diteruskan ke metode onError() pengamat.

Menggunakan operator pemblokiran untuk mengubah tanda tangan metode kembali ke List<User> , cuplikan kami sekarang akan terlihat seperti ini:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

Sebelum memanggil operator pemblokiran (yaitu blockingGet() ) pertama-tama kita perlu merantai operator agregat toList() sehingga aliran diubah dari Observable<User> menjadi Single<List<User>> ( Single adalah tipe khusus dari Observable yang hanya memancarkan satu nilai dalam onSuccess() , atau kesalahan melalui onError() ).

Setelah itu, kita dapat memanggil operator blockingGet() yang membuka bungkusan Single dan mengembalikan List<User> .

Meskipun RxJava mendukung ini, sebisa mungkin hal ini harus dihindari karena ini bukan pemrograman reaktif idiomatik. Namun, ketika benar-benar diperlukan, operator pemblokiran adalah cara awal yang bagus untuk keluar dari dunia reaktif.

Pendekatan Malas

Seperti disebutkan sebelumnya, RxJava dirancang dengan mempertimbangkan kemalasan. Artinya, operasi yang berjalan lama harus ditunda selama mungkin (yaitu, sampai langganan dipanggil pada Observable ). Untuk membuat solusi kita menjadi lambat, kita menggunakan operator defer() .

defer() mengambil pabrik ObservableSource yang membuat Observable untuk setiap pengamat baru yang berlangganan. Dalam kasus kami, kami ingin mengembalikan Observable.fromIterable(UserCache.getAllUser()) setiap kali seorang pengamat berlangganan.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Sekarang setelah operasi yang berjalan lama dibungkus dalam defer() , kami memiliki kontrol penuh tentang utas apa yang harus dijalankan hanya dengan menentukan Scheduler yang sesuai di subscribeOn() . Dengan perubahan ini, kode kami sepenuhnya reaktif dan langganan hanya boleh dilakukan pada saat data diperlukan.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

Operator lain yang cukup berguna untuk menunda komputasi adalah metode fromCallable() . Tidak seperti defer() , yang mengharapkan Observable dikembalikan dalam fungsi lambda dan pada gilirannya “meratakan” Observable yang dikembalikan, fromCallable() akan memanggil lambda dan mengembalikan nilai downstream.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Single menggunakan fromCallable() pada daftar sekarang akan mengembalikan Observable<List<User>> , kita perlu meratakan daftar ini menggunakan flatMap() .

Reaktif-segalanya

Dari contoh sebelumnya, kita telah melihat bahwa kita dapat membungkus objek apa pun dalam Observable dan melompat antara status non-reaktif dan reaktif menggunakan operasi pemblokiran dan defer() / fromCallable() . Dengan menggunakan konstruksi ini, kita dapat mulai mengonversi area aplikasi Android menjadi reaktif.

Operasi Panjang

Tempat yang baik untuk awalnya berpikir untuk menggunakan RxJava adalah setiap kali Anda memiliki proses yang membutuhkan beberapa saat untuk dilakukan, seperti panggilan jaringan (lihat posting sebelumnya untuk contoh), disk membaca dan menulis, dll. Contoh berikut menggambarkan fungsi sederhana yang akan menulis teks ke sistem file:

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

Saat memanggil fungsi ini, kita perlu memastikan bahwa itu dilakukan pada utas terpisah karena operasi ini memblokir. Memaksakan pembatasan seperti itu pada penelepon memperumit hal-hal bagi pengembang yang meningkatkan kemungkinan bug dan berpotensi memperlambat pengembangan.

Menambahkan komentar ke fungsi tentu saja akan membantu menghindari kesalahan pemanggil, tetapi itu masih jauh dari antipeluru.

Namun, menggunakan RxJava, kita dapat dengan mudah membungkusnya menjadi Observable dan menentukan Scheduler yang harus dijalankannya. Dengan cara ini, penelepon tidak perlu khawatir sama sekali dengan menjalankan fungsi di utas terpisah; fungsi akan mengurus ini sendiri.

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

Menggunakan fromCallable() , menulis teks ke file ditangguhkan hingga waktu berlangganan.

Karena pengecualian adalah objek kelas satu di RxJava, satu manfaat lain dari perubahan kami adalah bahwa kami tidak perlu lagi membungkus operasi dalam blok coba/tangkap. Pengecualian hanya akan disebarkan ke hilir daripada ditelan. Hal ini memungkinkan penelepon untuk menangani pengecualian yang dia anggap cocok (misalnya menunjukkan kesalahan kepada pengguna tergantung pada pengecualian apa yang dilemparkan, dll.).

Satu pengoptimalan lain yang dapat kami lakukan adalah mengembalikan Completable daripada Observable . A Completable pada dasarnya adalah tipe khusus dari Observable — mirip dengan Single — yang hanya menunjukkan jika komputasi berhasil, melalui onComplete() , atau gagal, melalui onError() . Mengembalikan sebuah Completable tampaknya lebih masuk akal dalam kasus ini karena tampaknya konyol untuk mengembalikan satu true dalam aliran yang Observable .

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

Untuk menyelesaikan operasi, kami menggunakan operasi fromAction() dari Completable karena nilai yang dikembalikan tidak lagi menarik bagi kami. Jika diperlukan, seperti Observable , Completable juga mendukung fungsi fromCallable() dan defer() .

Mengganti Callback

Sejauh ini, semua contoh yang telah kita lihat memancarkan satu nilai (yaitu, dapat dimodelkan sebagai Single ), atau memberi tahu kami jika suatu operasi berhasil atau gagal (yaitu, dapat dimodelkan sebagai Completable ).

Namun, bagaimana kami dapat mengonversi area di aplikasi kami yang menerima pembaruan atau peristiwa berkelanjutan (seperti pembaruan lokasi, peristiwa klik lihat, peristiwa sensor, dan sebagainya)?

Kita akan melihat dua cara untuk melakukan ini, menggunakan create() dan menggunakan Subjects .

create() memungkinkan kita untuk secara eksplisit memanggil onNext() | . pengamat onComplete() | metode onError() saat kami menerima pembaruan dari sumber data kami. Untuk menggunakan create() , kita mengirimkan ObservableOnSubscribe yang menerima ObservableEmitter setiap kali pengamat berlangganan. Dengan menggunakan emitor yang diterima, kita kemudian dapat melakukan semua panggilan pengaturan yang diperlukan untuk mulai menerima pembaruan dan kemudian memanggil event Emitter yang sesuai.

Dalam hal pembaruan lokasi, kami dapat mendaftar untuk menerima pembaruan di tempat ini dan memancarkan pembaruan lokasi seperti yang diterima.

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

Fungsi di dalam panggilan create() meminta pembaruan lokasi dan meneruskan panggilan balik yang dipanggil saat lokasi perangkat berubah. Seperti yang dapat kita lihat di sini, kami pada dasarnya mengganti antarmuka gaya panggilan balik dan sebagai gantinya memancarkan lokasi yang diterima di aliran Observable yang dibuat (demi tujuan pendidikan, saya melewatkan beberapa detail dengan membuat permintaan lokasi, jika Anda ingin menggali lebih dalam detailnya Anda bisa membacanya di sini).

Satu hal lain yang perlu diperhatikan tentang create() adalah, setiap kali subscribe() dipanggil, emitor baru disediakan. Dengan kata lain, create() mengembalikan dingin Observable . Artinya, dalam fungsi di atas, kami berpotensi meminta pembaruan lokasi beberapa kali, yang bukan kami inginkan.

Untuk mengatasinya, kami ingin mengubah fungsi untuk mengembalikan Observable panas dengan bantuan Subjects .

Masukkan Mata Pelajaran

Subject memperluas Observable dan mengimplementasikan Observer secara bersamaan. Ini sangat berguna setiap kali kami ingin memancarkan atau mentransmisikan acara yang sama ke beberapa pelanggan secara bersamaan. Dari segi implementasi, kami ingin mengekspos Subject sebagai Observable kepada klien, sambil mempertahankannya sebagai Subject untuk penyedia.

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

Dalam implementasi baru ini, subtipe PublishSubject digunakan yang memancarkan peristiwa saat tiba mulai dari waktu berlangganan. Oleh karena itu, jika langganan dilakukan pada titik ketika pembaruan lokasi telah dipancarkan, emisi sebelumnya tidak akan diterima oleh pengamat, hanya emisi berikutnya. Jika perilaku ini tidak diinginkan, ada beberapa subtipe Subject lain di toolkit RxJava yang dapat digunakan.

Selain itu, kami juga membuat fungsi connect() terpisah yang memulai permintaan untuk menerima pembaruan lokasi. observeLocation() masih dapat melakukan panggilan connect() , tetapi kami memfaktorkannya kembali dari fungsi untuk kejelasan/kesederhanaan.

Ringkasan

Kami telah melihat sejumlah mekanisme dan teknik:

  • defer() dan variannya untuk menunda eksekusi komputasi hingga berlangganan
  • Cold Observables dihasilkan melalui create()
  • Observables panas menggunakan Subjects
  • memblokir operasiX ketika kita ingin meninggalkan dunia reaktif

Semoga contoh yang diberikan dalam artikel ini menginspirasi beberapa ide mengenai berbagai area di aplikasi Anda yang dapat diubah menjadi reaktif. Kami telah membahas banyak hal dan jika Anda memiliki pertanyaan, saran, atau jika ada yang tidak jelas, silakan tinggalkan komentar di bawah!

Jika Anda tertarik untuk mempelajari lebih lanjut tentang RxJava, saya sedang mengerjakan buku mendalam yang menjelaskan cara melihat masalah dengan cara reaktif menggunakan contoh Android. Jika Anda ingin menerima pembaruan tentangnya, silakan berlangganan di sini.