Como simplificar a simultaneidade com modelagem reativa no Android

Publicados: 2022-03-11

A simultaneidade e a assincronicidade são inerentes à programação móvel.

Lidar com a simultaneidade por meio de programação no estilo imperativo, que é o que a programação no Android geralmente envolve, pode ser a causa de muitos problemas. Usando a programação reativa com RxJava, você pode evitar possíveis problemas de simultaneidade fornecendo uma solução mais limpa e menos propensa a erros.

Além de simplificar tarefas simultâneas e assíncronas, o RxJava também oferece a capacidade de executar operações de estilo funcional que transformam, combinam e agregam emissões de um observável até atingirmos o resultado desejado.

Ao combinar o paradigma reativo do RxJava e as operações de estilo funcional, podemos modelar uma ampla variedade de construções de simultaneidade de maneira reativa, mesmo no mundo não reativo do Android. Neste artigo, você aprenderá como fazer exatamente isso. Você também aprenderá como adotar o RxJava em um projeto existente de forma incremental.

Se você é novo no RxJava, recomendo a leitura do post aqui que fala sobre alguns dos fundamentos do RxJava.

Ligando o não-reativo ao mundo reativo

Um dos desafios de adicionar RxJava como uma das bibliotecas ao seu projeto é que ele muda fundamentalmente a maneira como você raciocina sobre seu código.

O RxJava exige que você pense nos dados como sendo empurrados em vez de puxados. Embora o conceito em si seja simples, alterar uma base de código completa baseada em um paradigma pull pode ser um pouco assustador. Embora a consistência seja sempre ideal, você nem sempre tem o privilégio de fazer essa transição em toda a sua base de código de uma só vez, portanto, pode ser necessária uma abordagem mais incremental.

Considere o seguinte código:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }

Esta função obtém uma lista de objetos User do cache, filtra cada um com base se o usuário tem ou não um blog, classifica-os pelo nome do usuário e, finalmente, os retorna ao chamador. Observando este trecho, notamos que muitas dessas operações podem tirar proveito dos operadores RxJava; por exemplo, filter() e sorted() .

Reescrever esse trecho nos dá:

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

A primeira linha da função converte o List<User> retornado por UserCache.getAllUsers() para um Observable<User> via fromIterable() . Este é o primeiro passo para tornar nosso código reativo. Agora que estamos operando em um Observable , isso nos permite executar qualquer operador Observable no kit de ferramentas RxJava – filter() e sorted() neste caso.

Há alguns outros pontos a serem observados sobre essa mudança.

Primeiro, a assinatura do método não é mais a mesma. Isso pode não ser um grande problema se essa chamada de método for usada apenas em alguns lugares e for fácil propagar as alterações para outras áreas da pilha; no entanto, se ele interromper os clientes que dependem desse método, isso será problemático e a assinatura do método deverá ser revertida.

Em segundo lugar, o RxJava foi projetado com a preguiça em mente. Ou seja, nenhuma operação longa deve ser executada quando não houver assinantes no Observable . Com essa modificação, essa suposição não é mais verdadeira, pois UserCache.getAllUsers() é invocado antes mesmo de haver assinantes.

Deixando o mundo reativo

Para resolver o primeiro problema de nossa alteração, podemos usar qualquer um dos operadores de bloqueio disponíveis para um Observable , como blockingFirst() e blockingNext() . Essencialmente, ambos os operadores bloquearão até que um item seja emitido a jusante: blockingFirst() retornará o primeiro elemento emitido e terminará, enquanto blockingNext() retornará um Iterable que permite executar um loop for-each nos dados subjacentes ( cada iteração através do loop irá bloquear).

Um efeito colateral de usar uma operação de bloqueio que é importante estar ciente, porém, é que as exceções são lançadas no thread de chamada em vez de serem passadas para o método onError() de um observador.

Usando um operador de bloqueio para alterar a assinatura do método de volta para um List<User> , nosso snippet agora ficaria assim:

 /** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }

Antes de chamar um operador de bloqueio (ou seja, blockingGet() ), primeiro precisamos encadear o operador agregado toList() para que o fluxo seja modificado de um Observable<User> para um Single<List<User>> (um Single é um tipo especial de Observable que emite apenas um único valor em onSuccess() , ou um erro via onError() ).

Depois, podemos chamar o operador de blockingGet() que desempacota o Single e retorna um List<User> .

Embora o RxJava suporte isso, tanto quanto possível, isso deve ser evitado, pois não é uma programação reativa idiomática. Quando absolutamente necessário, os operadores de bloqueio são uma boa maneira inicial de sair do mundo reativo.

A abordagem preguiçosa

Como mencionado anteriormente, o RxJava foi projetado com a preguiça em mente. Ou seja, as operações de longa duração devem ser atrasadas o máximo possível (ou seja, até que uma assinatura seja invocada em um Observable ). Para tornar nossa solução preguiçosa, usamos o operador defer() .

defer() recebe uma fábrica ObservableSource que cria um Observable para cada novo observador que se inscreve. Em nosso caso, queremos retornar Observable.fromIterable(UserCache.getAllUser()) sempre que um observador se inscrever.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Agora que a operação de longa duração está envolvida em um defer() , temos controle total sobre em qual thread isso deve ser executado simplesmente especificando o Scheduler apropriado em subscribeOn() . Com essa mudança, nosso código fica totalmente reativo e a assinatura só deve ocorrer no momento em que os dados forem necessários.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }

Outro operador bastante útil para adiar a computação é o método fromCallable() . Ao contrário de defer() , que espera que um Observable seja retornado na função lambda e, por sua vez, “achata” o Observable retornado, fromCallable() invocará o lambda e retornará o valor downstream.

 /** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }

Single usando fromCallable() em uma lista agora retornaria um Observable<List<User>> , precisamos nivelar essa lista usando flatMap() .

Reativo-tudo

Nos exemplos anteriores, vimos que podemos envolver qualquer objeto em um Observable e pular entre estados não reativos e reativos usando operações de bloqueio e defer( defer() / fromCallable() . Usando essas construções, podemos começar a converter áreas de um aplicativo Android para serem reativas.

Operações longas

Um bom lugar para pensar inicialmente em usar o RxJava é sempre que você tiver um processo que demora um pouco para ser executado, como chamadas de rede (confira o post anterior para exemplos), leituras e gravações de disco, etc. O exemplo a seguir ilustra uma função simples que escreverá texto no sistema de arquivos:

 /** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }

Ao chamar esta função, precisamos ter certeza de que ela é feita em uma thread separada, pois esta operação está bloqueando. A imposição de tal restrição ao chamador complica as coisas para o desenvolvedor, o que aumenta a probabilidade de erros e pode potencialmente retardar o desenvolvimento.

Adicionar um comentário à função ajudará a evitar erros do chamador, mas isso ainda está longe de ser à prova de balas.

Usando RxJava, no entanto, podemos facilmente envolver isso em um Observable e especificar o Scheduler no qual ele deve ser executado. Dessa forma, o chamador não precisa se preocupar em invocar a função em um thread separado; a função cuidará disso sozinha.

 /** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }

Usando fromCallable() , a gravação do texto no arquivo é adiada até o momento da assinatura.

Como as exceções são objetos de primeira classe em RxJava, outro benefício de nossa mudança é que não precisamos mais envolver a operação em um bloco try/catch. A exceção será simplesmente propagada a jusante em vez de ser engolida. Isso permite que o chamador lide com a exceção da maneira que achar melhor (por exemplo, mostrar um erro para o usuário dependendo de qual exceção foi lançada, etc.).

Uma outra otimização que podemos realizar é retornar um Completable em vez de um Observable . Um Completable é essencialmente um tipo especial de Observable - semelhante a um Single - que simplesmente indica se uma computação foi bem-sucedida, via onComplete() , ou falhou, via onError() . Retornar um Completable parece fazer mais sentido nesse caso, pois parece bobo retornar um único true em um fluxo Observable .

 /** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }

Para concluir a operação, usamos a operação fromAction() de um Completable , pois o valor de retorno não nos interessa mais. Se necessário, como um Observable , um Completable também suporta as fromCallable() e defer( defer() .

Substituindo retornos de chamada

Até agora, todos os exemplos que examinamos emitem um valor (ou seja, podem ser modelados como Single ), ou nos informam se uma operação foi bem-sucedida ou falhou (ou seja, pode ser modelada como Completable ).

No entanto, como podemos converter áreas em nosso aplicativo que recebem atualizações ou eventos contínuos (como atualizações de localização, eventos de cliques de visualização, eventos de sensores e assim por diante)?

Veremos duas maneiras de fazer isso, usando create() e usando Subjects .

create() nos permite invocar explicitamente o onNext() de um observador | onComplete() | onError() à medida que recebemos atualizações de nossa fonte de dados. Para usar create() , passamos um ObservableOnSubscribe que recebe um ObservableEmitter sempre que um observador se inscreve. Usando o emissor recebido, podemos executar todas as chamadas de configuração necessárias para começar a receber atualizações e, em seguida, invocar o evento Emitter apropriado.

No caso de atualizações de localização, podemos nos registrar para receber atualizações neste local e emitir atualizações de localização conforme recebidas.

 public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }

A função dentro da chamada create() solicita atualizações de localização e passa um retorno de chamada que é invocado quando a localização do dispositivo muda. Como podemos ver aqui, basicamente substituímos a interface estilo callback e, em vez disso, emitimos a localização recebida no fluxo Observable criado (para fins educacionais, pulei alguns detalhes ao construir uma solicitação de localização, se você quiser se aprofundar mais aprofundado nos detalhes você pode ler aqui).

Uma outra coisa a ser observada sobre create() é que, sempre que subscribe() é chamado, um novo emissor é fornecido. Em outras palavras, create() retorna um Observable frio. Isso significa que, na função acima, estaríamos solicitando atualizações de localização várias vezes, o que não é o que queremos.

Para contornar isso, queremos alterar a função para retornar um Hot Observable com a ajuda de Subjects .

Insira assuntos

Um Subject estende um Observable e implementa um Observer ao mesmo tempo. Isso é particularmente útil sempre que queremos emitir ou transmitir o mesmo evento para vários assinantes ao mesmo tempo. Em termos de implementação, gostaríamos de expor o Subject como um Observable para os clientes, mantendo-o como um Subject para o provedor.

 public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }

Nesta nova implementação, é utilizado o subtipo PublishSubject que emite eventos à medida que chegam a partir do momento da assinatura. Assim, se uma assinatura for realizada em um ponto em que as atualizações de localização já tenham sido emitidas, as emissões passadas não serão recebidas pelo observador, apenas as subsequentes. Se esse comportamento não for desejado, há alguns outros subtipos de Subject no kit de ferramentas RxJava que podem ser usados.

Além disso, também criamos uma função connect() separada que inicia a solicitação para receber atualizações de localização. O observeLocation() ainda pode fazer a chamada connect() , mas nós o refatoramos fora da função para maior clareza/simplicidade.

Resumo

Analisamos vários mecanismos e técnicas:

  • defer() e suas variantes para atrasar a execução de um cálculo até a assinatura
  • Cold Observables gerados através de create()
  • Observables ​​quentes usando Subjects
  • bloqueando as operações X quando queremos sair do mundo reativo

Felizmente, os exemplos fornecidos neste artigo inspiraram algumas ideias sobre diferentes áreas do seu aplicativo que podem ser convertidas para serem reativas. Cobrimos muito e se você tiver alguma dúvida, sugestão ou se algo não estiver claro, sinta-se à vontade para deixar um comentário abaixo!

Se você estiver interessado em aprender mais sobre RxJava, estou trabalhando em um livro detalhado que explica como visualizar problemas de maneira reativa usando exemplos do Android. Se você deseja receber atualizações sobre ele, inscreva-se aqui.