Android에서 반응형 모델링으로 동시성을 단순화하는 방법

게시 됨: 2022-03-11

동시성과 비동기성은 모바일 프로그래밍에 내재되어 있습니다.

Android 프로그래밍이 일반적으로 수반하는 명령형 프로그래밍을 통해 동시성을 처리하는 것은 많은 문제의 원인이 될 수 있습니다. RxJava와 함께 반응 프로그래밍을 사용하면 더 깨끗하고 오류가 발생하기 쉬운 솔루션을 제공하여 잠재적인 동시성 문제를 피할 수 있습니다.

동시 비동기 작업을 단순화하는 것 외에도 RxJava는 원하는 결과를 얻을 때까지 Observable에서 방출을 변환, 결합 및 집계하는 기능적 스타일 작업을 수행하는 기능도 제공합니다.

RxJava의 반응형 패러다임과 기능적 스타일 작업을 결합하여 Android의 비반응형 세계에서도 반응 방식으로 광범위한 동시성 구성을 모델링할 수 있습니다. 이 기사에서는 정확히 그렇게 할 수 있는 방법을 배우게 될 것입니다. 또한 RxJava를 기존 프로젝트에 점진적으로 도입하는 방법도 배우게 됩니다.

RxJava가 처음이라면 여기에서 RxJava의 기본 사항에 대해 설명하는 게시물을 읽는 것이 좋습니다.

비반응형을 반응형 세계로 연결하기

RxJava를 프로젝트에 라이브러리 중 하나로 추가할 때의 문제 중 하나는 코드에 대해 추론하는 방식을 근본적으로 변경한다는 것입니다.

RxJava에서는 데이터를 끌어오기가 아니라 밀어넣는 것으로 생각해야 합니다. 개념 자체는 간단하지만 풀 패러다임을 기반으로 하는 전체 코드베이스를 변경하는 것은 다소 어려울 수 있습니다. 일관성이 항상 이상적이지만 전체 코드 기반에서 한 번에 이러한 전환을 수행할 수 있는 권한이 항상 있는 것은 아니므로 더 많은 점진적 접근 방식이 필요할 수 있습니다.

다음 코드를 고려하십시오.

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

이 함수는 캐시에서 User 개체 목록을 가져오고 사용자가 블로그를 가지고 있는지 여부에 따라 각 개체를 필터링하고 사용자 이름별로 정렬한 다음 마지막으로 호출자에게 반환합니다. 이 스니펫을 보면 이러한 작업 중 많은 부분이 RxJava 연산자를 활용할 수 있음을 알 수 있습니다. 예를 들어 filter()sorted() .

이 스니펫을 다시 작성하면 다음이 제공됩니다.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

함수의 첫 번째 줄은 UserCache.getAllUsers() 가 반환한 List<User> User> 를 fromIterable() 을 통해 Observable<User> 로 변환합니다. 이것은 코드를 반응형으로 만드는 첫 번째 단계입니다. 이제 Observable 에서 작업하고 있으므로 RxJava 툴킷에서 모든 Observable 연산자(이 경우 filter()sorted() 를 수행할 수 있습니다.

이 변경 사항에 대해 주의해야 할 몇 가지 다른 사항이 있습니다.

첫째, 메서드 서명이 더 이상 동일하지 않습니다. 이 메서드 호출이 몇 곳에서만 사용되고 스택의 다른 영역으로 변경 사항을 전파하기 쉬운 경우 이것은 큰 문제가 아닐 수 있습니다. 그러나 이 메서드에 의존하는 클라이언트가 중단되면 문제가 되며 메서드 서명을 되돌려야 합니다.

둘째, RxJava는 게으름을 염두에 두고 설계되었습니다. 즉, Observable 에 대한 구독자가 없을 때 긴 작업을 수행해서는 안 됩니다. 이 수정으로 인해 가입자가 있기 전에도 UserCache.getAllUsers() 가 호출되기 때문에 이러한 가정은 더 이상 사실이 아닙니다.

리액티브 월드에서 나가기

변경 사항의 첫 번째 문제를 해결하기 위해 blockingFirst()blockingNext() ) 와 같이 Observable 에서 사용할 수 있는 차단 연산자를 사용할 수 있습니다. 기본적으로 이 두 연산자는 항목이 다운스트림으로 방출될 때까지 차단됩니다. blockingFirst() 는 방출된 첫 번째 요소를 반환하고 Iterable 하는 반면, blockingNext() 는 기본 데이터( 루프를 통한 각 반복은 차단됩니다).

그러나 주의해야 할 중요한 차단 작업을 사용할 때의 부작용은 관찰자의 onError() 메서드에 전달되지 않고 호출 스레드에서 예외가 throw된다는 것입니다.

차단 연산자를 사용하여 메서드 서명을 다시 List<User> 로 변경하면 코드 조각이 다음과 같이 보일 것입니다.

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

차단 연산자(예: blockingGet() )를 호출하기 전에 스트림이 Observable<User> 에서 Single<List<User>> 로 수정되도록 집계 연산자 toList() 를 먼저 연결해야 합니다( Single 은 특수 유형입니다. onSuccess() 에서 단일 값만 내보내거나 onError() 를 통해 오류를 내보내는 Observable 중 하나입니다.

그 다음에는 Single 을 풀고 List<User> 를 반환하는 차단 연산자 blockingGet() 을 호출할 수 있습니다.

RxJava가 이것을 지원하지만 이것은 관용적 반응 프로그래밍이 아니기 때문에 가능한 한 피해야 합니다. 그러나 절대적으로 필요할 때 차단 연산자는 반응 세계에서 벗어나는 좋은 초기 방법입니다.

게으른 접근

앞서 언급했듯이 RxJava는 게으름을 염두에 두고 설계되었습니다. 즉, 장기 실행 작업은 가능한 한 오래 지연되어야 합니다(즉, Observable 에서 구독이 호출될 때까지). 우리의 솔루션을 게으르게 만들기 위해 defer() 연산자를 사용합니다.

defer() 는 구독하는 각각의 새로운 관찰자에 대해 Observable 을 생성하는 ObservableSource 팩토리를 받습니다. 우리의 경우 관찰자가 구독할 때마다 Observable.fromIterable(UserCache.getAllUser()) 를 반환하려고 합니다.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

이제 장기 실행 작업이 defer() 에 래핑되었으므로 subscribeOn() 에서 적절한 Scheduler 를 지정하는 것만으로 이 작업이 실행되어야 하는 스레드에 대해 완전히 제어할 수 있습니다. 이 변경으로 코드는 완전히 반응적이며 구독은 데이터가 필요한 순간에만 발생해야 합니다.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

계산을 연기하는 또 다른 매우 유용한 연산자는 fromCallable() 메서드입니다. 람다 함수에서 Observable 이 반환될 것으로 예상하고 반환된 Observable 을 "평평화"하는 defer() 와 달리 fromCallable() 은 람다를 호출하고 다운스트림 값을 반환합니다.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

단일 목록에서 fromCallable() 을 사용하면 이제 Observable<List<User>> 를 반환 flatMap() 을 사용하여 이 목록을 평면화해야 합니다.

반응성 - 모든 것

이전 예제에서 Observable 의 모든 객체를 래핑하고 블로킹 작업과 defer() / fromCallable() 을 사용하여 비반응 상태와 반응 상태 사이를 이동할 수 있음을 보았습니다. 이러한 구성을 사용하여 Android 앱의 영역을 반응형으로 변환할 수 있습니다.

긴 작업

처음에 RxJava 사용에 대해 생각하기에 좋은 위치는 네트워크 호출(예제를 보려면 이전 게시물을 확인하십시오), 디스크 읽기 및 쓰기 등과 같이 수행하는 데 시간이 걸리는 프로세스가 있을 때입니다. 다음 예제는 다음을 수행하는 간단한 기능을 보여줍니다. 파일 시스템에 텍스트를 씁니다.

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

이 함수를 호출할 때 이 작업이 차단되므로 별도의 스레드에서 수행되는지 확인해야 합니다. 호출자에게 이러한 제한을 적용하면 개발자에게 문제가 복잡해지고 버그 가능성이 증가하고 잠재적으로 개발 속도가 느려질 수 있습니다.

물론 함수에 주석을 추가하면 호출자의 오류를 방지하는 데 도움이 되지만 여전히 방탄과는 거리가 멉니다.

그러나 RxJava를 사용하면 이것을 Observable 로 쉽게 래핑하고 실행할 Scheduler 를 지정할 수 있습니다. 이렇게 하면 호출자가 별도의 스레드에서 함수를 호출하는 데 전혀 신경 쓸 필요가 없습니다. 이 기능은 자체적으로 처리합니다.

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

fromCallable() 을 사용하여 파일에 텍스트를 쓰는 것은 구독 시간까지 연기됩니다.

예외는 RxJava에서 일급 객체이기 때문에 변경의 또 다른 이점은 더 이상 try/catch 블록에서 작업을 래핑할 필요가 없다는 것입니다. 예외는 삼키지 않고 다운스트림으로 전파됩니다. 이를 통해 호출자는 자신이 적합하다고 생각하는 예외를 처리할 수 있습니다(예: 발생한 예외에 따라 사용자에게 오류 표시 등).

우리가 수행할 수 있는 또 다른 최적화는 Observable 이 아닌 Completable 을 반환하는 것입니다. Completable 은 본질적으로 Single 과 유사한 Observable 의 특별한 유형으로, 단순히 onComplete() 를 통해 계산이 성공했는지 또는 onError() 를 통해 실패했는지를 나타냅니다. Observable 스트림에서 하나의 true를 반환하는 것이 어리석은 것처럼 보이기 때문에 이 경우 Completable 을 반환하는 것이 더 합리적입니다.

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

작업을 완료하기 위해 반환 값이 더 이상 관심이 없기 때문에 Completable의 Completable fromAction() 작업을 사용합니다. Observable 과 같이 필요한 경우 CompletablefromCallable()defer() 함수도 지원합니다.

콜백 대체

지금까지 살펴본 모든 예제는 하나의 값을 내보내거나(즉, Single 로 모델링할 수 있음) 작업의 성공 또는 실패 여부를 알려줍니다(즉, Completable 로 모델링할 수 있음).

그러나 지속적인 업데이트나 이벤트(예: 위치 업데이트, 보기 클릭 이벤트, 센서 이벤트 등)를 수신하는 앱의 영역을 어떻게 변환할 수 있습니까?

create() 를 사용하고 Subjects 를 사용하여 이를 수행하는 두 가지 방법을 살펴보겠습니다.

create() 를 사용하면 관찰자의 onNext() | onComplete() | 데이터 소스에서 업데이트를 수신할 때 onError() 메서드를 사용합니다. create() 를 사용하려면 관찰자가 구독할 때마다 ObservableOnSubscribe ObservableEmitter 전달합니다. 수신된 이미터를 사용하여 업데이트 수신을 시작한 다음 적절한 Emitter 이벤트를 호출하는 데 필요한 모든 설정 호출을 수행할 수 있습니다.

위치 업데이트의 경우 이 위치에서 업데이트를 수신하도록 등록하고 수신된 위치 업데이트를 내보낼 수 있습니다.

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

create() 호출 내부의 함수는 위치 업데이트를 요청하고 장치의 위치가 변경될 때 호출되는 콜백을 전달합니다. 여기에서 볼 수 있듯이 기본적으로 콜백 스타일 인터페이스를 교체하고 대신 생성된 Observable 스트림에서 수신된 위치를 내보냅니다. 자세한 내용은 여기에서 읽을 수 있습니다).

create() 에 대해 주의해야 할 또 다른 사항은 subscribe() 가 호출될 때마다 새 이미터가 제공된다는 것입니다. 즉, create() 는 콜드 Observable 을 반환합니다. 이것은 위의 함수에서 우리가 원하는 것이 아닌 위치 업데이트를 여러 번 요청할 수 있음을 의미합니다.

이 문제를 해결하기 위해 Subjects 의 도움으로 뜨거운 Observable 을 반환하도록 함수를 변경하려고 합니다.

주제 입력

SubjectObservable 을 확장함과 동시에 Observer 를 구현합니다. 이는 동일한 이벤트를 동시에 여러 구독자에게 내보내거나 캐스트하려는 경우에 특히 유용합니다. 구현 측면에서 우리는 SubjectObservable 로 클라이언트에 노출하고 공급자에 대한 Subject 로 유지하기를 원할 것입니다.

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

이 새로운 구현에서는 서브타입 PublishSubject 가 사용되어 구독 시점부터 이벤트가 도착할 때 이벤트를 방출합니다. 따라서 위치 업데이트가 이미 방출된 시점에서 구독이 수행되면 관찰자는 과거 방출을 수신하지 않고 후속 방출만 수신합니다. 이 동작이 바람직하지 않은 경우 사용할 수 있는 RxJava 툴킷에 몇 가지 다른 Subject 하위 유형이 있습니다.

또한 위치 업데이트 수신 요청을 시작하는 별도의 connect() 함수도 만들었습니다. observeLocation() 은 여전히 connect() 호출을 수행할 수 있지만 명확성/단순성을 위해 함수에서 이를 리팩토링했습니다.

요약

우리는 다음과 같은 여러 메커니즘과 기술을 살펴보았습니다.

  • defer() 및 구독까지 계산 실행을 지연하는 변형
  • create() 를 통해 생성된 콜드 Observables
  • Subjects 를 사용하는 뜨거운 Observables
  • 반응 세계를 떠나고 싶을 때 blockingX 작업

이 기사에 제공된 예제가 반응형으로 변환할 수 있는 앱의 다양한 영역에 대한 아이디어에 영감을 주었길 바랍니다. 우리는 많은 것을 다루었고 질문, 제안이 있거나 명확하지 않은 것이 있으면 아래에 의견을 남겨주세요!

RxJava에 대해 더 알고 싶다면 Android 예제를 사용하여 반응적인 방식으로 문제를 보는 방법을 설명하는 심층적인 책을 작성 중입니다. 이에 대한 업데이트를 수신하려면 여기에서 구독하십시오.