Android'de Reaktif Modelleme ile Eşzamanlılık Nasıl Basitleştirilir

Yayınlanan: 2022-03-11

Eşzamanlılık ve eşzamansızlık, mobil programlamanın doğasında vardır.

Android'de programlamanın genel olarak içerdiği şey olan zorunlu-tarzı programlama yoluyla eşzamanlılıkla uğraşmak birçok sorunun nedeni olabilir. Reaktif Programlamayı RxJava ile kullanarak, daha temiz ve daha az hataya açık bir çözüm sağlayarak olası eşzamanlılık sorunlarını önleyebilirsiniz.

Eşzamanlı, asenkron görevleri basitleştirmenin yanı sıra, RxJava, biz istenen sonucu elde edene kadar bir Gözlenebilir'den gelen emisyonları dönüştüren, birleştiren ve toplayan işlevsel stil operasyonları gerçekleştirme yeteneği de sağlar.

RxJava'nın reaktif paradigmasını ve işlevsel stil işlemlerini birleştirerek, Android'in reaktif olmayan dünyasında bile çok çeşitli eşzamanlılık yapılarını reaktif bir şekilde modelleyebiliriz. Bu yazıda, tam olarak bunu nasıl yapabileceğinizi öğreneceksiniz. Ayrıca, RxJava'yı aşamalı olarak mevcut bir projeye nasıl adapte edeceğinizi öğreneceksiniz.

RxJava'da yeniyseniz, burada RxJava'nın bazı temellerinden bahseden yazıyı okumanızı tavsiye ederim.

Reaktif Olmayanları Reaktif Dünyaya Köprülemek

RxJava'yı kitaplıklardan biri olarak projenize eklemenin zorluklarından biri, kodunuz hakkında akıl yürütme şeklinizi temelden değiştirmesidir.

RxJava, verileri çekilmek yerine itilmiş olarak düşünmenizi gerektirir. Konseptin kendisi basit olsa da, bir çekme paradigmasına dayanan tam bir kod tabanını değiştirmek biraz göz korkutucu olabilir. Tutarlılık her zaman ideal olsa da, bu geçişi tüm kod tabanınız boyunca tek seferde yapma ayrıcalığına her zaman sahip olmayabilirsiniz, bu nedenle daha fazla artımlı bir yaklaşım gerekebilir.

Aşağıdaki kodu göz önünde bulundurun:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

Bu işlev önbellekten User nesnelerinin bir listesini alır, her birini kullanıcının bir blogu olup olmamasına göre filtreler, bunları kullanıcı adına göre sıralar ve son olarak bunları arayana geri döndürür. Bu snippet'e baktığımızda, bu işlemlerin birçoğunun RxJava operatörlerinden yararlanabileceğini fark ediyoruz; örneğin, filter() ve sorted() .

Bu parçacığı yeniden yazmak bize şunu verir:

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

İşlevin ilk satırı, UserCache.getAllUsers() tarafından döndürülen List<User> UserCache.getAllUsers() aracılığıyla bir Observable<User> fromIterable() . Bu, kodumuzu reaktif hale getirmenin ilk adımıdır. Artık bir Observable üzerinde çalıştığımıza göre, bu, RxJava araç setinde herhangi bir Observable operatörü gerçekleştirmemizi sağlar – bu durumda filter() ve sorted sorted() .

Bu değişiklikle ilgili dikkat edilmesi gereken birkaç nokta daha var.

İlk olarak, yöntem imzası artık aynı değil. Bu yöntem çağrısı yalnızca birkaç yerde kullanılıyorsa ve değişiklikleri yığının diğer alanlarına yaymak kolaysa, bu çok önemli olmayabilir; ancak, bu yönteme dayanan istemcileri bozarsa, bu sorunludur ve yöntem imzası geri alınmalıdır.

İkincisi, RxJava tembellik düşünülerek tasarlanmıştır. Yani, Observable abonesi olmadığında uzun işlemler yapılmamalıdır. Bu değişiklikle, UserCache.getAllUsers() herhangi bir abone olmadan önce çağrıldığından bu varsayım artık doğru değildir.

Reaktif Dünyadan Ayrılmak

Değişikliğimizden kaynaklanan ilk sorunu ele almak için, bir Observable için kullanılabilen engelleme operatörlerinden herhangi birini, örneğin blockingFirst() blockingNext() i kullanabiliriz. Esasen, bu operatörlerin her ikisi de aşağı akışta bir öğe yayılana kadar engelleyecektir: blockingFirst() yayılan ilk öğeyi döndürür ve bitirir, buna karşın blockingNext() , temel veriler üzerinde her biri için bir döngü gerçekleştirmenize izin veren bir Iterable (Yinelenebilir) döndürür ( döngü boyunca her yineleme engellenir).

Yine de, bir engelleme işlemi kullanmanın farkında olunması gereken bir yan etkisi, istisnaların bir gözlemcinin onError() yöntemine iletilmek yerine çağıran iş parçacığına atılmasıdır.

Yöntem imzasını tekrar List<User> olarak değiştirmek için bir engelleme operatörü kullanarak, snippet'imiz şimdi şöyle görünür:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

Bir engelleme operatörünü (yani blockingGet() ) çağırmadan önce, akışın bir Observable<User> 'dan bir Single<List<User>> 'a ( Single , özel bir türdür) değiştirilmesi için ilk olarak toplama operatörünü toList() zincirlememiz gerekir. onSuccess() içinde yalnızca tek bir değer yayan veya onError() yoluyla bir hata yayan Observable değeri).

Daha sonra, Single öğesini açan ve bir List<User> döndüren blockingGet() engelleme operatörünü çağırabiliriz.

RxJava bunu desteklese de, deyimsel reaktif programlama olmadığı için mümkün olduğunca bundan kaçınılmalıdır. Yine de kesinlikle gerekli olduğunda, operatörleri engellemek, reaktif dünyadan çıkmanın güzel bir ilk yoludur.

Tembel Yaklaşım

Daha önce de belirtildiği gibi, RxJava tembellik düşünülerek tasarlanmıştır. Diğer bir deyişle, uzun süreli işlemler mümkün olduğu kadar geciktirilmelidir (yani, bir Observable üzerinde bir abone çağrılana kadar). Çözümümüzü tembelleştirmek için defer() operatörünü kullanıyoruz.

defer() , abone olan her yeni gözlemci için bir Observable oluşturan bir ObservableSource fabrikasını alır. Bizim durumumuzda, bir gözlemci her abone olduğunda Observable.fromIterable(UserCache.getAllUser()) döndürmek istiyoruz.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Artık uzun süren işlem bir defer() içine sarıldığına göre, bunun hangi iş parçacığında çalıştırılacağı konusunda tam kontrole sahibiz, sadece uygun Scheduler subscribeOn() içinde belirterek. Bu değişiklikle birlikte kodumuz tamamen reaktiftir ve abonelik yalnızca verilere ihtiyaç duyulduğu anda gerçekleşmelidir.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

Hesaplamayı ertelemek için oldukça kullanışlı bir başka operatör de fromCallable() yöntemidir. Lambda işlevinde bir Observable döndürülmesini bekleyen ve sırayla döndürülen Observable "düzleştiren" defer()'in aksine, defer() fromCallable() çağırır ve aşağı akış değerini döndürür.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Bir listede fromCallable() kullanımı artık bir Observable<List<User>> döndürür, bu listeyi flatMap() kullanarak düzleştirmemiz gerekir.

reaktif-her şey

Önceki örneklerden, herhangi bir nesneyi bir Observable sarabileceğimizi ve engelleme işlemlerini ve defer( defer() / fromCallable() kullanarak reaktif olmayan ve reaktif durumlar arasında atlayabileceğimizi gördük. Bu yapıları kullanarak, bir Android uygulamasının alanlarını reaktif olacak şekilde dönüştürmeye başlayabiliriz.

Uzun İşlemler

Ağ aramaları (örnekler için önceki gönderiye bakın), disk okuma ve yazma işlemleri, vb. gibi gerçekleştirmesi biraz zaman alan bir işleminiz olduğunda, RxJava'yı kullanmayı ilk başta düşünmek için iyi bir yer. Aşağıdaki örnek, basit bir işlevi göstermektedir. dosya sistemine metin yazacak:

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

Bu fonksiyonu çağırırken bu işlem bloke olduğu için ayrı bir thread üzerinde yapıldığından emin olmamız gerekiyor. Arayan kişiye böyle bir kısıtlama uygulamak, geliştirici için hataların olasılığını artıran ve potansiyel olarak geliştirmeyi yavaşlatabilecek işleri karmaşık hale getirir.

İşleve bir yorum eklemek, elbette arayan tarafından hataların önlenmesine yardımcı olacaktır, ancak bu hala kurşun geçirmez olmaktan uzaktır.

Ancak RxJava'yı kullanarak bunu kolayca bir Observable sarabilir ve üzerinde çalışması gereken Scheduler belirtebiliriz. Bu şekilde, arayan kişinin işlevi ayrı bir iş parçacığında çağırmakla hiç ilgilenmesine gerek kalmaz; fonksiyon bununla kendisi ilgilenecektir.

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

fromCallable() kullanılarak, metnin dosyaya yazılması abonelik zamanına kadar ertelenir.

İstisnalar RxJava'da birinci sınıf nesneler olduğundan, değişikliğimizin bir diğer faydası da artık işlemi bir try/catch bloğuna sarmamıza gerek olmamasıdır. İstisna, yutulmak yerine basitçe aşağı doğru yayılacaktır. Bu, arayanın uygun gördüğü bir istisnayı ele almasına izin verir (örneğin, hangi istisnanın atıldığına bağlı olarak kullanıcıya bir hata gösterin, vb.).

Gerçekleştirebileceğimiz bir diğer optimizasyon, Observable yerine Completable döndürmektir. Bir Completable , esasen, bir hesaplamanın onComplete() aracılığıyla başarılı olup olmadığını veya onError() () aracılığıyla başarısız olup olmadığını gösteren, Single benzer şekilde özel bir Observable türüdür. Bir Observable akışta tek bir doğru döndürmek aptalca göründüğünden, bir Completable döndürmek bu durumda daha mantıklı görünüyor.

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

İşlemi tamamlamak için, dönüş değeri artık bizi ilgilendirmediği için bir Completable fromAction() işlemini kullanırız. Gerekirse, Observable gibi bir Completable da fromCallable() ve defer( defer() işlevlerini destekler.

Geri Aramaları Değiştirme

Şimdiye kadar, incelediğimiz tüm örnekler ya bir değer yayar (yani, Single olarak modellenebilir) veya bize bir işlemin başarılı mı yoksa başarısız mı olduğunu söyler (yani, Completable olarak modellenebilir).

Bununla birlikte, uygulamamızda sürekli güncellemeler veya olaylar (konum güncellemeleri, tıklama olaylarını görüntüleme, sensör olayları vb.) alan alanları nasıl dönüştürebiliriz?

Bunu yapmanın iki yoluna bakacağız: create() ve Subjects .

create() , bir gözlemcinin onNext() açıkça çağırmamıza izin verir | onComplete() | veri kaynağımızdan güncellemeler aldığımız için onError() yöntemi. create() işlevini kullanmak için, bir gözlemci her abone olduğunda bir ObservableOnSubscribe alan bir ObservableEmitter . Alınan emitörü kullanarak, güncellemeleri almaya başlamak için gerekli tüm kurulum çağrılarını gerçekleştirebilir ve ardından uygun Emitter olayını başlatabiliriz.

Konum güncellemeleri durumunda, bu yerde güncellemeleri almak için kayıt olabilir ve alınan konum güncellemelerini yayınlayabiliriz.

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

create() çağrısının içindeki işlev, konum güncellemelerini ister ve aygıtın konumu değiştiğinde çağrılan bir geri çağrıya geçer. Burada görebileceğimiz gibi, esasen geri arama tarzı arayüzü değiştiriyoruz ve bunun yerine oluşturulan Gözlemlenebilir akışta alınan konumu yayıyoruz (eğitim amacıyla, eğer araştırmak istiyorsanız, bir konum isteği oluştururken bazı ayrıntıları atladım. Ayrıntıların derinliklerine buradan okuyabilirsiniz).

create() ile ilgili dikkat edilmesi gereken diğer bir şey de, subscribe() çağrıldığında yeni bir emitör sağlanmasıdır. Başka bir deyişle, create() soğuk bir Observable döndürür. Bu, yukarıdaki fonksiyonda potansiyel olarak birden çok kez konum güncellemesi talep edeceğimiz anlamına gelir, bu bizim istediğimiz şey değildir.

Bu soruna geçici bir çözüm bulmak için, Subjects yardımıyla sıcak bir Observable döndürmek için işlevi değiştirmek istiyoruz.

Konu Girin

Bir Subject , bir Observable genişletir ve aynı zamanda Observer uygular. Bu, aynı olayı aynı anda birden fazla aboneye göndermek veya yayınlamak istediğimizde özellikle yararlıdır. Uygulama açısından, Subject Subject tutarken, Müşterilere Observable olarak göstermek isteriz.

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

Bu yeni uygulamada, abonelik zamanından başlayarak olayları geldikçe yayan PublishSubject alt türü kullanılır. Buna göre, konum güncellemelerinin halihazırda yayınlandığı bir noktada bir abonelik gerçekleştirilirse, gözlemci geçmiş emisyonları almaz, yalnızca sonraki emisyonları alır. Bu davranış istenmiyorsa, RxJava araç setinde kullanılabilecek birkaç diğer Subject alt türü vardır.

Ayrıca, konum güncellemelerini alma isteğini başlatan ayrı bir connect() işlevi de oluşturduk. observeLocation() hala connect() çağrısını yapabilir, ancak netlik/basitlik için onu işlevden çıkardık.

Özet

Bir dizi mekanizmaya ve tekniğe baktık:

  • aboneliğe kadar bir hesaplamanın yürütülmesini geciktirmek için defer() ve türevleri
  • create() ile oluşturulan soğuk Observables
  • Subjects kullanan sıcak Observables
  • Reaktif dünyadan ayrılmak istediğimizde bloklamaX işlemleri

Umarız bu makalede verilen örnekler, uygulamanızdaki reaktif hale dönüştürülebilecek farklı alanlarla ilgili bazı fikirlere ilham vermiştir. Çok şey ele aldık ve herhangi bir sorunuz, öneriniz varsa veya net olmayan bir şey varsa, aşağıya yorum bırakmaktan çekinmeyin!

RxJava hakkında daha fazla bilgi edinmek istiyorsanız, Android örneklerini kullanarak sorunlara tepkisel yoldan nasıl bakılacağını açıklayan derinlemesine bir kitap üzerinde çalışıyorum. Bununla ilgili güncellemeleri almak istiyorsanız, lütfen buradan abone olun.