Asincronía y paralelismo con Kotlin coroutines

En este artículo explicaré una propuesta de cómo tratar la asincronía de nuestros Interactors utilizando las Kotlin coroutines.

Para ello utilizaré conceptos de Clean Architecture, si no estás muy familiarizado con esta parte, te recomiendo que antes le eches un ojo a un artículo anterior: De MVP a Clean II. ¡Hagámoslo Clean!.

Las piezas necesarias

En este artículo no voy a describir con detalle las corutinas de Kotlin, para ello dejaré al final del artículo varios enlaces que me han servido de referencia. Simplemente, y a modo de presentación, diré que es una librería creada por el equipo de Jetbrains con la intención de hacer más sencillo el trabajo asíncrono. En la documentación oficial las describen como «hilos ligeros lanzados en el contexto de un CoroutineScope».

Veamos primero en detalle las 3 funciones de extensión que he creado para gestionar la asincronía de mis Interactors:

fun launch(block: suspend CoroutineScope.() -> Unit): Job {
  return CoroutineScope(Dispatchers.Main).launch { block() }
}

suspend fun <T> asyncSeq(block: suspend CoroutineScope.() -> T): T {
  return withContext(Dispatchers.IO) { block() }
}

fun <T> async(function: () -> T): Deferred<T> {
  return CoroutineScope(Dispatchers.IO).async { function() }
}

  • launch: este método nos ayuda a envolver nuestras corutinas, que serán lanzadas siempre desde el hilo principal (Dispatchers.Main).
  • asyncSeq: este método lanza la función que recibe como parámetro en un hilo aparte y espera su respuesta; es ideal, y muy eficiente (ya que no crea una nueva corutina, simplemente cambia de hilo de ejecución) para flujos de llamadas sequenciales o dependientes.
  • async: este método crea una nueva corutina para lanzar la función que recibe como parámetro, y retorna una especie de valor futuro (Deferred). Cuando necesitemos obtener el resultado, utilizaremos la función await() para indicar que queremos esperar a que llegue la respuesta.

El interactor

abstract class Interactor<TReq, TRes> {

  abstract suspend fun execute(request: TReq): Either<Throwable, TRes>

  operator fun invoke(request: TReq, onError: (Throwable) -> Unit = {}, onSuccess: (TRes) -> Unit = {}) {
    launch {
      val response = execute(request)
      if (response.isRight) {
        onSuccess(response.rightValue)
      } else {
        onError(response.leftValue)
      }
    }
  }
}

Gran parte de la magia necesaria para poder lanzar operaciones en paralelo desde nuestros interactors sucede aquí.

A través de la herencia, obligaremos a los interactors que extiendan de esta clase a sobreescribir el método abstracto execute(). Este método es a su vez una función de suspensión (suspend), requisito indispensable para poder lanzar corutinas.

Por otro lado, utilizaremos el método invoke() para envolver la ejecución del método execute() dentro de un bloque launch{}. La respuesta (un Either, que no es más que un tipo que representa un valor con dos posibilidades) se valida antes de lanzar onSuccess() o onError().

Gestionando valores futuros

class ParallelInteractor @Inject constructor(
  private val hotelRepository: HotelRepository,
  private val reviewRepository: ReviewRepository
): Interactor<ParallelInteractor.Request, ParallelInteractor.Response>() {

  class Request(val id: Int)

  class Response(val hotel: Hotel, val review: Review)

  override suspend fun execute(request: Request): Either<Throwable, Response> {
    val hotelFuture = async { hotelRepository.getHotel(request.id) }
    val reviewFuture = async { reviewRepository.getReview(request.id) }

    val hotelResponse = hotelFuture.await()
    val reviewResponse = reviewFuture.await()

    return if (hotelResponse.isRight && reviewResponse.isRight) {
      Either.Right(Response(hotelResponse.rightValue, reviewResponse.rightValue))
    } else if (hotelResponse.isLeft) {
      Either.Left(hotelResponse.leftValue)
    } else {
      Either.Left(reviewResponse.leftValue)
    }
  }
}

Para mejorar la comprensión del artículo, he creado un proyecto con dos tipos de interactors: uno que gestiona su trabajo como un único bloque asíncrono, y otro que ejecuta en paralelo sus llamadas a los repositories.

El ejemplo de arriba ejecuta el trabajo en paralelo, agrupando cada llamada dentro de un bloque async{}. Cuando necesitamos detener el flujo y esperar por la respuesta, simplemente tendremos que llamar a await().

Testing

@Test
fun `if all goes well we compose an response with the expected data`() {
  Mockito.`when`(hotelRepository.getHotel(fakeId)).thenReturn(Either.Right(hotel))
  Mockito.`when`(reviewRepository.getReview(fakeId)).thenReturn(Either.Right(review))

  val response = runBlocking { parallelInteractor.execute(ParallelInteractor.Request(fakeId)) }

  assertEquals(hotel, response.rightValue.hotel)
  assertEquals(review, response.rightValue.review)
}

Ante cualquier decisión arquitectónica que tomemos, la testabilidad del código debe estar siempre presente. Las corutinas de Kotlin nos proporcionan un método (runBlocking) que nos ayuda a bloquear el hilo de ejecución de nuestras funciones de suspensión hasta su finalización.

Conclusión

Desde el punto de vista de la arquitectura, no considero este el escenario ideal, sino simplemente el necesario. Para mí lo idóneo sería considerar la gestión de la asincronía un detalle de implementación y ocultárselo a los clientes de los interactors; pero los interactors a menudo estarán compuestos por varias llamadas a los repositories, y en la guerra Arquitectura vs Experiencia de usuario, quizás tenga sentido saltarse algún que otro precepto de vez en cuando. 😉

Código fuente

Enlaces de referencia