RxJava: dale un respiro al hilo principal

La ejecución de procesos asíncronos forma parte de nuestro día a día como desarrolladores. Tanto si queremos lanzar una petición a un servicio web, como si realizamos una operación pesada sobre base de datos, tenemos que aislarnos del hilo principal (Main Thread) si no queremos bloquear la interfaz o recibir una NetworkOnMainThreadException.

En este artículo mostraré cómo podemos utilizar RxJava para manejar todo nuestro trabajo asíncrono. Mi intención es centrarme más en cómo encajar este framework en una arquitectura Clean que en explicar todas sus peculiaridades (para esto ya existen infinidad de excelentes artículos en la red, como por ejemplo el de Vogella o el de Dan Lew, entre otros).

La estructura del ejemplo usará MVP para la capa de presentación y Clean Architecture.

Empecemos con una mini introducción

Según la documentación oficial, ReactiveX es una combinación de las mejores ideas del patrón Observer, el patrón Iterator y la programación funcional.

RxJava no es más que una implementación de dicha librería, y su funcionamiento, de forma muy simplificada, se reduce a que existen dos clases: una conocida como Observable, que se dedicará a emitir datos, y otra conocida como Subscriber (o Observer), que se dedicará a actuar sobre los elementos emitidos.

Esta es una lista con los diferentes tipos de observables:

Tipo Descripción
Observable<T> Emite de 0 a n ítems, y termina con un success o un error.
Flowable<T> Emite de 0 a n ítems, y termina con un success o un error. Soporta backpressure, lo que permite controlar la velocidad con la que una fuente emite ítems.
Single<T> Emite un único ítem o un evento de error.
Completable Nunca emite objetos. Simplemente se «completa» con un success o un error.
Maybe<T> Es una combinación de Completable y Single<T>. Puede emitir un ítem o ninguno, o un evento de error.

Para este artículo me apoyaré en un observable de tipo Single. Su estructura es idéntica al clásico callback, y creo que esto lo hace idóneo para tratar el tema con sencillez y claridad.

Añadamos ahora algo de contexto

En el artículo sobre Clean Architecture hablé de los casos de uso,

public interface UseCase<T, P> {
  void execute(Handler<T> handler, P params);
}

que son las clases que se encargan de dirigir el flujo de datos desde y hacia las entidades, y de la interfaz Handler,

public interface Handler<T> {
  void handle(T result);
  void error(Exception exception);
}

el callback que nos permitía establecer la comunicación entre capas.

Esta estructura nos ayuda a cumplir con una arquitectura basada en el desacoplamiento de capas, pero falla en algo muy importante: su flujo de trabajo es síncrono. Esto quiere decir que el caso de uso no abandona el hilo principal al ser ejecutado; por lo tanto, si para cumplir su objetivo necesitase por ejemplo insertar 10000 elementos en la base de datos, es muy probable que acabásemos por bloquear el hilo principal e incurriendo en un ANR.

¿Y esto cómo lo solucionamos?

El manejo de hilos puede convertirse en algo realmente engorroso, pero RxJava nos facilita mucho la vida en este aspecto, tanto que podríamos resumir mi propuesta en dos puntos principales:

  • El UseCase se encargará de asignar al Observable un hilo de trabajo y un hilo de emisión de datos.
  • El Presenter asociará un Observer al UseCase cuando lo ejecute, y de esta forma podrá escuchar los datos emitidos.

El UseCase maneja los hilos

Veamos nuestro UseCase:

public abstract class UseCase<Observer, Params> {

  private final UIScheduler uiScheduler;
  private final JobScheduler jobScheduler;

  UseCase(UIScheduler uiScheduler, JobScheduler jobScheduler) {
    this.uiScheduler = uiScheduler;
    this.jobScheduler = jobScheduler;
  }

  abstract Single<Observer> buildUseCaseObservable(Params params);

  public Disposable execute(DisposableSingleObserver<Observer> observer, Params params) {
    final Single<Observer> observable = this.buildUseCaseObservable(params)
    .observeOn(uiScheduler.getScheduler())
    .subscribeOn(jobScheduler.getScheduler());
    return observable.subscribeWith(observer);
  }
}

Esta clase recibe por parámetros dos interfaces, UIScheduler y JobScheduler, que representan el hilo donde serán emitidos los datos y el hilo donde se realizará el trabajo, respectivamente. Ambas interfaces pertenecen a la capa de dominio, pero sus implementaciones no, estas deben ubicarse en el lugar que les corresponde: data para el hilo de trabajo y platform para el hilo de escucha (Android Main Thread).

@Singleton
public class JobThread implements JobScheduler {

  @Inject
  JobThread() {}

  @Override
  public Scheduler getScheduler() {
    return Schedulers.io();
  }
}

Schedulers.io() es uno de los schedulers proporcionados por RxJava. Funciona de forma similar a un ThreadPoolExecutor, aunque maneja un numero ilimitado de hilos. Este Scheduler nos servirá en la mayoría de situaciones, pero es muy recomendable que investiguemos todo lo que RxJava nos proporciona. Por ejemplo, si quisiésemos limitar el número de hilos activos a 5, podríamos definir nuestro Scheduler de la siguiente manera: Scheduler.from(Executors.newFixedThreadPool(5)).

@Singleton
public class UIThread implements UIScheduler {

  @Inject
  UIThread() {}

  @Override
  public Scheduler getScheduler() {
    return AndroidSchedulers.mainThread();
  }
}

RxAndroid es una extensión de RxJava que nos proporciona un Scheduler para poder escuchar los datos emitidos en el hilo principal.

Una vez definidos los hilos, ya sólo nos quedaría ligarlos a nuestro Observable a través de los métodos observeOn() y subscribeOn().

Antes de pasar al siguiente apartado, me gustaría aclarar que la responsabilidad de crear el Observable pertenece a las subclases de UseCase:

public class GetMovies extends UseCase<List<Movie>, GetMovies.Params> {

  private MoviesRepository repository;

  public GetMovies(MoviesRepository repository,
    UIScheduler uiScheduler,
    JobScheduler jobScheduler) {
    super(uiScheduler, jobScheduler);
    this.repository = repository;
  }

  @Override
  Single<List<Movie>> buildUseCaseObservable(GetMovies.Params params) {
    return Single.create(emitter -> {
      try {
        List<Movie> movieList = repository.getMovies(params.isOnlyOnline());
        emitter.onSuccess(movieList);
      } catch (Exception exception) {
        if (!emitter.isDisposed()) {
          emitter.onError(exception);
        }
      }
    });
  }

  public static final class Params {
    private final boolean onlyOnline;

    public Params(boolean onlyOnline){
      this.onlyOnline = onlyOnline;
    }

    public boolean isOnlyOnline() {
      return onlyOnline;
    }
  }
}

El ejemplo de arriba representa a un caso de uso concreto cuyo objetivo es acceder al repositorio para obtener una lista de películas. Como es una subclase de UseCase, debe sobreescribir el método buildUseCaseObservable(), y es aquí donde crea el Observable.

Cabe destacar que el flujo de trabajo definido en la creación del Observable no se lanza hasta que se relaciona al Observer con el Observable, es decir, tras el método subscribeWith().

El Presenter observa

La clase que representa a mi Observer es de tipo DisposableSingleObserver,

public class Observer<T> extends DisposableSingleObserver<T> {

  @Override
  public void onSuccess(T t) {}

  @Override
  public void onError(Throwable e) {}
 
}

esta clase se utilizará internamente en nuestro Presenter para definir el Observer (que será enviado al UseCase como parámetro en el método execute()). Es aquí, en el Presenter, donde escuchamos los elementos emitidos.

public class MovieListPresenter {

  // code...

  private void invokeUseCase(boolean refresh) {
    UseCase useCase = useCaseFactory.getMovies();
    useCase.execute(new MoviesObserver(), new GetMovies.Params(refresh));
  }

  private final class MoviesObserver extends Observer<List<Movie>> {
    @Override
    public void onSuccess(List<Movie> movieList) {
      saveMovies(movieList);
      MovieListView movieListView = view.get();
      if(movieListView!=null) {
        movieListView.cancelRefreshDialog();
        movieListView.refreshList();
      }
    }

    @Override
    public void onError(Throwable exception) {
      MovieListView movieListView = view.get();
      if(movieListView!=null) {
        movieListView.cancelRefreshDialog();
        movieListView.showErrorMessage(exception.getMessage());
      }
    }
  }

  // code...
}

Y para cerrar el círculo…

UseCase RxJava

Este diagrama creo que representa muy bien el flujo Presenter-UseCase:

  1. execute(): ejecutamos un caso de uso enviando el Observer como parámetro.
  2. buildUseCaseObservable(): delegamos en la subclase de UseCase la creación del Observable.
  3. observeOn(): definimos el hilo (Scheduler) donde escucharemos los datos emitidos.
  4. subscribeOn(): definimos el hilo de trabajo asíncrono.
  5. subscribeWith(): suscribimos el Observer recibido como parámetro al Observable creado por la subclase de UseCase (en este momento se lanza el trabajo definido en el Observable).
  6. Emitimos a través de onSuccess() o onError().
  7. Escuchamos respuesta en onSuccess() o onError().

Un último apunte

Se puede dar la situación de que nuestro Presenter utilice más de un caso de uso, esto seguramente nos llevaría a tener varios Observers ligados a sus respectivos Observables. Para evitar mantener suscripciones innecesarias, se considera una buena práctica desligar todos nuestros Observers de sus Observables una vez la vista ha desaparecido, es decir, cuando se destruya la Activity o el Fragment.

Debido a que mi Observer es de tipo DisposableSingleObserver y no simplemente SingleObserver, el método subscribeWith() retornará un Disposable, esto me permite utilizar la clase contenedor CompositeDisposable para gestionar las subscripciones.

public abstract class BasePresenter {

  private final CompositeDisposable disposables = new CompositeDisposable();

  void addDisposable(Disposable disposable) {
    disposables.add(disposable);
  }

  public void destroy() {
    if (!disposables.isDisposed()) {
      disposables.dispose();
    }
  }
}

A través del método addDisposable() iré añadiendo las subscripciones (Disposable) al contenedor. Y una vez que la vista se destruya, no tendremos más que notificar al Presenter para que las elimine.

public class Presenter extends BasePresenter {

  // code...

  private void invokeUseCase1() {
    UseCase useCase = useCaseFactory.getUseCase1();
    addDisposable(useCase.execute(new Observer1(), new UseCase1.Params()));
  }

  private void invokeUseCase2() {
    UseCase useCase = useCaseFactory.getUseCase2();
    addDisposable(useCase.execute(new Observer2(), new UseCase2.Params()));
  }

  // code...
}

Conclusión

En este artículo me he centrado en la utilización del framework RxJava dentro de una arquitectura Clean. Los puntos a destacar serían:

  • Un Observable es síncrono por defecto en RxJava. Los métodos observeOn() y subscribeOn() nos sirven para definir el hilo donde escucharemos los datos emitidos y el hilo donde operará nuestro Observable, ambos de tipo Scheduler.
  • Aislar mediante abstracciones los Schedulers nos permite que nuestro código de dominio ignore sus implementaciones concretas, y por lo tanto se vea menos afectado por sus cambios.
  • Los Observers serán definidos como clases internas del Presenter, que utilizará el contenedor CompositeDisposable para eliminar todas las subscripciones cuando la vista se destruya.

Código fuente

Enlaces de referencia