Cómo simplificar la concurrencia con modelado reactivo en Android
Publicado: 2022-03-11La concurrencia y la asincronía son inherentes a la programación móvil.
Lidiar con la concurrencia a través de la programación de estilo imperativo, que es lo que generalmente implica la programación en Android, puede ser la causa de muchos problemas. Al usar la programación reactiva con RxJava, puede evitar posibles problemas de concurrencia al proporcionar una solución más limpia y menos propensa a errores.
Además de simplificar las tareas asincrónicas concurrentes, RxJava también brinda la capacidad de realizar operaciones de estilo funcional que transforman, combinan y agregan emisiones de un Observable hasta que logramos el resultado deseado.
Al combinar el paradigma reactivo y las operaciones de estilo funcional de RxJava, podemos modelar una amplia gama de construcciones de simultaneidad de manera reactiva, incluso en el mundo no reactivo de Android. En este artículo, aprenderá cómo puede hacer exactamente eso. También aprenderá cómo adoptar RxJava en un proyecto existente de forma incremental.
Si es nuevo en RxJava, le recomiendo leer la publicación aquí que habla sobre algunos de los fundamentos de RxJava.
Conectando lo no reactivo con el mundo reactivo
Uno de los desafíos de agregar RxJava como una de las bibliotecas de su proyecto es que cambia fundamentalmente la forma en que razona sobre su código.
RxJava requiere que piense en los datos como si fueran empujados en lugar de ser extraídos. Si bien el concepto en sí es simple, cambiar una base de código completa que se basa en un paradigma de extracción puede ser un poco desalentador. Si bien la consistencia siempre es ideal, es posible que no siempre tenga el privilegio de realizar esta transición en toda la base de código de una vez, por lo que es posible que se necesite un enfoque más incremental.
Considere el siguiente código:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { final List<User> allUsers = UserCache.getAllUsers(); final List<User> usersWithBlogs = new ArrayList<>(); for (User user : allUsers) { if (user.blog != null && !user.blog.isEmpty()) { usersWithBlogs.add(user); } } Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); return usersWithBlogs; }
Esta función obtiene una lista de objetos de User
del caché, filtra cada uno según si el usuario tiene o no un blog, los ordena por el nombre del usuario y finalmente los devuelve a la persona que llama. Mirando este fragmento, notamos que muchas de estas operaciones pueden aprovechar los operadores RxJava; por ejemplo, filter()
y sorted()
.
Reescribir este fragmento nos da:
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
La primera línea de la función convierte el List<User>
devuelto por UserCache.getAllUsers()
en un Observable<User>
a través fromIterable()
. Este es el primer paso para hacer que nuestro código sea reactivo. Ahora que estamos operando en un Observable
, esto nos permite realizar cualquier operador Observable
en el kit de herramientas RxJava: filter()
y sorted()
en este caso.
Hay algunos otros puntos a tener en cuenta sobre este cambio.
Primero, la firma del método ya no es la misma. Esto puede no ser un gran problema si esta llamada de método solo se usa en algunos lugares y es fácil propagar los cambios a otras áreas de la pila; sin embargo, si interrumpe a los clientes que confían en este método, eso es problemático y la firma del método debe revertirse.
En segundo lugar, RxJava está diseñado pensando en la pereza. Es decir, no se deben realizar operaciones largas cuando no hay suscriptores al Observable
. Con esta modificación, esa suposición ya no es cierta ya que UserCache.getAllUsers()
se invoca incluso antes de que haya suscriptores.
Dejando el mundo reactivo
Para abordar el primer problema de nuestro cambio, podemos utilizar cualquiera de los operadores de bloqueo disponibles para un Observable
, como blockingFirst()
y blockingNext()
. Esencialmente, ambos operadores bloquearán hasta que se emita un elemento en sentido descendente: blockingFirst()
devolverá el primer elemento emitido y finalizará, mientras que blockingNext()
devolverá un Iterable
que le permite realizar un bucle for-each en los datos subyacentes ( cada iteración a través del bucle se bloqueará).
Sin embargo, un efecto secundario de usar una operación de bloqueo que es importante tener en cuenta es que las excepciones se lanzan en el subproceso de llamada en lugar de pasar al método onError()
de un observador.
Usando un operador de bloqueo para cambiar la firma del método de nuevo a List<User>
, nuestro fragmento ahora se vería así:
/** * @return a list of users with blogs */ public List<User> getUsersWithBlogs() { return Observable.fromIterable(UserCache.getAllUsers()) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .toList() .blockingGet(); }
Antes de llamar a un operador de bloqueo (es decir blockingGet()
), primero debemos encadenar el operador agregado toList()
para que la secuencia se modifique de un Observable<User>
a un Single<List<User>>
(un Single
es un tipo especial de Observable
que solo emite un único valor en onSuccess()
, o un error a través onError()
).
Luego, podemos llamar al operador de blockingGet()
que desenvuelve el Single
y devuelve un List<User>
.
Aunque RxJava admite esto, debe evitarse en la medida de lo posible, ya que no se trata de una programación reactiva idiomática. Sin embargo, cuando es absolutamente necesario, los operadores de bloqueo son una buena forma inicial de salir del mundo reactivo.
El enfoque perezoso
Como se mencionó anteriormente, RxJava fue diseñado pensando en la pereza. Es decir, las operaciones de larga duración deben retrasarse tanto como sea posible (es decir, hasta que se invoque una suscripción en un Observable
). Para hacer que nuestra solución sea perezosa, usamos el operador defer()
.
defer()
toma una fábrica de ObservableSource
que crea un Observable
para cada nuevo observador que se suscribe. En nuestro caso, queremos devolver Observable.fromIterable(UserCache.getAllUser())
cada vez que se suscriba un observador.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
Ahora que la operación de larga duración está envuelta en un defer()
, tenemos control total sobre en qué subproceso debe ejecutarse simplemente especificando el Scheduler
apropiado en subscribeOn()
. Con este cambio, nuestro código es completamente reactivo y la suscripción solo debe ocurrir en el momento en que se necesitan los datos.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { return Observable.defer(() -> Observable.fromIterable(UserCache.getAllUsers())) .filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)) .subscribeOn(Schedulers.io()); }
Otro operador bastante útil para diferir el cálculo es el método fromCallable()
. A diferencia de defer()
, que espera que se devuelva un Observable
en la función lambda y, a su vez, "aplana" el Observable
devuelto, fromCallable()
invocará la lambda y devolverá el valor aguas abajo.
/** * @return a list of users with blogs */ public Observable<User> getUsersWithBlogs() { final Observable<List<User>> usersObservable = Observable.fromCallable(() -> UserCache.getAllUsers()); final Observable<User> userObservable = usersObservable.flatMap(users -> Observable.fromIterable(users)); return userObservable.filter(user -> user.blog != null && !user.blog.isEmpty()) .sorted((user1, user2) -> user1.name.compareTo(user2.name)); }
El uso único de fromCallable()
en una lista ahora devolvería un Observable<List<User>>
, necesitamos aplanar esta lista usando flatMap()
.
Reactivo-todo
De los ejemplos anteriores, hemos visto que podemos envolver cualquier objeto en un Observable
y saltar entre estados no reactivos y reactivos usando operaciones de bloqueo y defer()
/ fromCallable()
. Usando estas construcciones, podemos comenzar a convertir áreas de una aplicación de Android para que sean reactivas.
Operaciones Largas
Un buen lugar para pensar inicialmente en usar RxJava es cada vez que tenga un proceso que tarde un tiempo en realizarse, como llamadas de red (consulte la publicación anterior para ver ejemplos), lecturas y escrituras de disco, etc. El siguiente ejemplo ilustra una función simple que escribirá texto en el sistema de archivos:
/** * Writes {@code text} to the file system. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return true if the text was successfully written, otherwise, false */ public boolean writeTextToFile(Context context, String filename, String text) { FileOutputStream outputStream; try { outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
Al llamar a esta función, debemos asegurarnos de que se realice en un subproceso separado, ya que esta operación está bloqueando. Imponer tal restricción a la persona que llama complica las cosas para el desarrollador, lo que aumenta la probabilidad de errores y puede ralentizar potencialmente el desarrollo.

Agregar un comentario a la función, por supuesto, ayudará a evitar errores por parte de la persona que llama, pero eso aún está lejos de ser infalible.
Sin embargo, al usar RxJava, podemos envolver esto fácilmente en un Observable
y especificar el Scheduler
en el que debe ejecutarse. De esta manera, la persona que llama no necesita preocuparse en absoluto por invocar la función en un hilo separado; la función se encargará de esto por sí misma.
/** * Writes {@code text} to the filesystem. * * @param context a Context * @param filename the name of the file * @param text the text to write * @return An Observable emitting a boolean indicating whether or not the text was successfully written. */ public Observable<Boolean> writeTextToFile(Context context, String filename, String text) { return Observable.fromCallable(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); return true; }).subscribeOn(Schedulers.io()); }
Con fromCallable()
, la escritura del texto en el archivo se pospone hasta el momento de la suscripción.
Dado que las excepciones son objetos de primera clase en RxJava, otro beneficio de nuestro cambio es que ya no necesitamos envolver la operación en un bloque try/catch. La excepción simplemente se propagará aguas abajo en lugar de ser absorbida. Esto permite que la persona que llama maneje la excepción como lo crea conveniente (por ejemplo, mostrar un error al usuario dependiendo de qué excepción se haya lanzado, etc.).
Otra optimización que podemos realizar es devolver un Completable
en lugar de un Observable
. Un Completable
es esencialmente un tipo especial de Observable
, similar a un Single
, que simplemente indica si un cálculo tuvo éxito, a través de onComplete()
, o falló, a través de onError()
. Devolver un Completable
parece tener más sentido en este caso, ya que parece una tontería devolver un solo verdadero en un flujo Observable
.
/** * Writes {@code text} to the filesystem. * * @param context a context * @param filename the name of the file * @param text the text to write * @return A Completable */ public Completable writeTextToFile(Context context, String filename, String text) { return Completable.fromAction(() -> { FileOutputStream outputStream; outputStream = context.openFileOutput(filename, Context.MODE_PRIVATE); outputStream.write(text.getBytes()); outputStream.close(); }).subscribeOn(Schedulers.io()); }
Para completar la operación, usamos la operación fromAction()
de un Completable
ya que el valor devuelto ya no nos interesa. Si es necesario, como un Observable
, un Completable
también admite las fromCallable()
y defer()
.
Sustitución de devoluciones de llamada
Hasta ahora, todos los ejemplos que hemos visto emiten un valor (es decir, se pueden modelar como Single
) o nos dicen si una operación tuvo éxito o falló (es decir, se puede modelar como Completable
).
Sin embargo, ¿cómo podemos convertir áreas en nuestra aplicación que reciben actualizaciones o eventos continuos (como actualizaciones de ubicación, eventos de clics de visualización, eventos de sensores, etc.)?
Veremos dos formas de hacer esto, usando create()
y usando Subjects
.
create()
nos permite invocar explícitamente onNext()
| de un observador. onComplete()
| onError()
cuando recibimos actualizaciones de nuestra fuente de datos. Para usar create()
, pasamos un ObservableOnSubscribe
que recibe un ObservableEmitter
cada vez que un observador se suscribe. Usando el emisor recibido, podemos realizar todas las llamadas de configuración necesarias para comenzar a recibir actualizaciones y luego invocar el evento Emitter
apropiado.
En el caso de actualizaciones de ubicación, podemos registrarnos para recibir actualizaciones en este lugar y emitir actualizaciones de ubicación tal como se reciben.
public class LocationManager { /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return Observable.create(emitter -> { // Make sure that the following conditions apply and if not, call the emitter's onError() method // (1) googleApiClient is connected // (2) location permission is granted final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { if (!emitter.isDisposed()) { emitter.onNext(location); } } }); }); } }
La función dentro de la llamada create()
solicita actualizaciones de ubicación y pasa una devolución de llamada que se invoca cuando cambia la ubicación del dispositivo. Como podemos ver aquí, esencialmente reemplazamos la interfaz de estilo de devolución de llamada y, en su lugar, emitimos la ubicación recibida en el flujo Observable creado (por motivos educativos, omití algunos de los detalles al construir una solicitud de ubicación, si desea profundizar más profundo en los detalles se puede leer aquí).
Otra cosa a tener en cuenta sobre create()
es que, cada vez que se llama a subscribe()
, se proporciona un nuevo emisor. En otras palabras, create()
devuelve un Observable
frío. Esto significa que, en la función anterior, potencialmente estaríamos solicitando actualizaciones de ubicación varias veces, lo cual no es lo que queremos.
Para evitar esto, queremos cambiar la función para devolver un Observable
con la ayuda de Subjects
.
Introducir materias
Un Subject
extiende un Observable
e implementa un Observer
al mismo tiempo. Esto es particularmente útil cuando queremos emitir o enviar el mismo evento a varios suscriptores al mismo tiempo. En cuanto a la implementación, nos gustaría exponer el Subject
como un Observable
a los clientes, mientras lo mantenemos como un Subject
para el proveedor.
public class LocationManager { private Subject<Location> locationSubject = PublishSubject.create(); /** * Invoke this method when this LocationManager should start listening to location updates. */ public void connect() { final LocationRequest locationRequest = new LocationRequest(); locationRequest.setInterval(1000); locationRequest.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY); LocationServices.FusedLocationApi.requestLocationUpdates(googleApiClient, locationRequest, new LocationListener() { @Override public void onLocationChanged(Location location) { locationSubject.onNext(location); } }); } /** * Call to receive device location updates. * @return An Observable emitting location updates */ public Observable<Location> observeLocation() { return locationSubject; } }
En esta nueva implementación se utiliza el subtipo PublishSubject
que emite eventos a medida que llegan a partir del momento de la suscripción. Por lo tanto, si se realiza una suscripción en un punto en el que ya se han emitido actualizaciones de ubicación, el observador no recibirá las emisiones pasadas, sino las posteriores. Si no se desea este comportamiento, hay un par de otros subtipos de Subject
en el kit de herramientas RxJava que se pueden usar.
Además, también creamos una función connect()
separada que inicia la solicitud para recibir actualizaciones de ubicación. observeLocation()
todavía puede hacer la llamada connect()
, pero la refactorizamos fuera de la función para mayor claridad/simplicidad.
Resumen
Hemos analizado una serie de mecanismos y técnicas:
-
defer()
y sus variantes para retrasar la ejecución de un cálculo hasta la suscripción -
Observables
fríos generados a través decreate()
-
Observables
calientes usandoSubjects
- operaciones blockingX cuando queremos salir del mundo reactivo
Con suerte, los ejemplos proporcionados en este artículo inspiraron algunas ideas con respecto a las diferentes áreas de su aplicación que se pueden convertir para que sean reactivas. Hemos cubierto mucho y si tiene alguna pregunta, sugerencia o si algo no está claro, ¡no dude en dejar un comentario a continuación!
Si está interesado en obtener más información sobre RxJava, estoy trabajando en un libro detallado que explica cómo ver los problemas de forma reactiva usando ejemplos de Android. Si desea recibir actualizaciones al respecto, suscríbase aquí.