Diseño simple de Kotlin Flow

Flujo por Grant Tarrant

En una historia anterior de "Flujos fríos, canales calientes", definí flujos de datos fríos y calientes y mostré un caso de uso para Kotlin Flows: flujos asíncronos fríos. Ahora echemos un vistazo debajo del capó, examinemos su diseño y veamos cómo una combinación de características del lenguaje y una biblioteca permite una abstracción poderosa con un diseño simple.

Un flujo en Kotlin está representado por una interfaz²:

interfaz Flow  {
    suspender diversión recoger (colector: FlowCollector )
}

Todo lo que hay en un flujo es una función de recopilación única que acepta una instancia de la interfaz FlowCollector con un único método de emisión:

interfaz FlowCollector  {
    suspender diversión emitir (valor: T)
}

Un nombre de emisión debe sonar familiar para un lector de "Flujos fríos, canales calientes". De hecho, he mostrado un ejemplo de la siguiente definición de flujo:

valores: flujo  = flujo {
    para (i en 1..10) {
        retraso (100)
        emit (i) // <- emit se llama aquí
    }
}

Una firma del generador de flujo también utiliza una interfaz FlowCollector como receptor³, para que podamos emitir directamente desde el cuerpo de la lambda correspondiente:

fun  flow (bloque: suspender FlowCollector . () -> Unidad): Flow 

Para un uso simple de un flujo, cuando se recoge el flujo, así:

ints.collect {println (it)} // toma 1 segundo, imprime 10 ints

lo que sucede es que se crea una instancia de FlowCollector basada en la función lambda pasada para recopilar {...} y esta misma instancia se pasa al flujo {...} cuerpo⁴.

Por lo tanto, una interacción entre un emisor de flujo y un colector de flujo es la de una llamada a una función simple: una llamada a la función de emisión. Si alineamos mentalmente esta llamada de función, podemos entender de inmediato lo que sucede cuando ejecutamos este código⁵, será equivalente a:

para (i en 1..10) {
    retraso (100)
    println (i) // <- emit fue llamado aquí
}

Operadores

Un generador de flujo y un operador de terminal de recolección es todo lo que necesitamos saber para comenzar a escribir operadores que transformen los flujos de varias maneras. Por ejemplo, un operador de mapa básico que aplica una transformación específica a cada valor emitido puede implementarse de esta manera:

diversión  Flujo  .map (transformación: suspender (valor: T) -> R) = flujo {
    recolectar {emitir (transformar ())}
}

Usando este operador ahora podemos hacer ints.map {it * it} para definir un flujo con cuadrados de los enteros originales. Los elementos aún fluyen del emisor al colector a través de llamadas a funciones. Simplemente hay una función más entre ahora.

En realidad, la biblioteca kotlinx.coroutines ya define el mapa y una gran cantidad de otros operadores de propósito general como extensiones en el tipo Flow, siguiendo el enfoque de diseño orientado a extensiones. Lo importante en este diseño es que es bastante fácil definir operadores específicos de dominio. No hay distinción entre operadores "incorporados" y "definidos por el usuario": todos los operadores son de primera clase.

Contrapresión

La contrapresión en la ingeniería de software se define como la capacidad de un consumidor de datos que no puede mantenerse al día con los datos entrantes para enviar una señal al productor de datos para reducir la velocidad de los elementos de datos.

El diseño de las corrientes reactivas tradicionales implica un canal posterior para solicitar más datos de los productores según sea necesario. La gestión de este protocolo de solicitud conduce a implementaciones notoriamente difíciles, incluso para operadores simples. No vemos nada de esta complejidad en el diseño de los flujos de Kotlin, ni en la implementación de operadores para ellos, sin embargo, los flujos de Kotlin admiten la contrapresión. ¿Cómo?

La gestión transparente de la contrapresión se logra en los flujos de Kotlin mediante el uso de las funciones de suspensión de Kotlin. Es posible que haya notado que todas las funciones y tipos funcionales en el diseño de flujo de Kotlin están marcados con el modificador de suspensión: estas funciones tienen un súper poder para suspender la ejecución de la persona que llama sin bloquear un hilo⁹. Entonces, cuando el colector del flujo está abrumado, simplemente puede suspender el emisor y reanudarlo más tarde cuando esté listo para aceptar más elementos.

Esto es bastante similar a la gestión de contrapresión en los canales de datos síncronos tradicionales basados ​​en subprocesos, donde un consumidor lento aplica automáticamente la contrapresión al productor en virtud del bloqueo del subproceso del productor. Las funciones de suspensión lo llevan más allá de un solo subproceso y entran en el ámbito de la programación asincrónica, al administrar de forma transparente la contrapresión a través de los subprocesos sin bloquearlos. Pero eso se cuenta en otra historia.

Lecturas adicionales y notas al pie

  1. ^ Flujos fríos, canales calientes
  2. ^ Flow y los tipos y funciones relacionados todavía están en versión preliminar a partir de la versión 1.2.1 de la biblioteca kotlinx.coroutines. Leer más aquí.
  3. ^ Tipos de funciones en Kotlin
  4. ^ Esta es una ligera simplificación. No tiene en cuenta controles adicionales para garantizar la preservación del contexto, pero ese tema está fuera del alcance de esta historia. Más detalles en el contexto de Ejecución de Kotlin Flows.
  5. ^ Puede ejecutar este código a través de Kotlin Playground aquí.
  6. ^ Diseño orientado a la extensión
  7. ^ Corrientes reactivas
  8. ^ Implementación de operadores para [RxJava] 2.0
  9. ^ Bloqueo de hilos, suspensión de corutinas