Capítulo 4. Patrones de concurrencia en Go

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Hemos explorado los fundamentos de las primitivas de concurrencia de Go y discutido cómo utilizarlas adecuadamente. En este capítulo, profundizaremos en cómo componer estas primitivas en patrones que te ayudarán a mantener tu sistema escalable y mantenible.

Sin embargo, antes de empezar, tenemos que tocar el formato de algunos de los patrones contenidos en este capítulo. En muchos de los ejemplos, utilizaremos canales que pasan interfaces vacías (interface{}). El uso de interfaces vacías en Go es controvertido; sin embargo, lo he hecho por un par de razones. La primera es que facilita la redacción de ejemplos concisos en el resto del libro. La segunda es que en algunos casos creo que esto es más representativo de lo que el patrón intenta conseguir. Trataremos este punto más directamente en la sección "Tuberías".

Si esto te parece demasiado polémico, recuerda que siempre puedes crear generadores Go para este código, y generar los patrones para utilizar el tipo que te interese.

Dicho esto, ¡vamos a sumergirnos y a conocer algunos patrones de concurrencia en Go!

Encierro

Cuando se trabaja con código concurrente, hay algunas opciones diferentes para un funcionamiento seguro. Hemos repasado dos de ellas:

  • Primitivas de sincronización para compartir memoria (por ejemplo, sync.Mutex)

  • Sincronización mediante comunicación (por ejemplo, canales)

Sin embargo, hay un par de opciones más que son implícitamente seguras dentro de varios procesos concurrentes:

  • Datos inmutables

  • Datos protegidos por confinamiento

En cierto sentido, los datos inmutables son ideales porque son implícitamente seguros para la concurrencia. Cada proceso concurrente puede operar sobre los mismos datos, pero no puede modificarlos. Si quiere crear nuevos datos, debe crear una nueva copia de los datos con las modificaciones deseadas. Esto no sólo permite aligerar la carga cognitiva del desarrollador, sino que también puede dar lugar a programas más rápidos si conduce a secciones críticas más pequeñas (o las elimina por completo). En Go, puedes conseguirlo escribiendo código que utilice copias de valores en lugar de punteros a valores en memoria. Algunos lenguajes admiten la utilización de punteros con valores explícitamente inmutables; sin embargo, Go no se encuentra entre ellos.

El confinamiento también puede permitir una carga cognitiva más ligera para el desarrollador y secciones críticas más pequeñas. Las técnicas para confinar valores concurrentes son un poco más complicadas que simplemente pasar copias de valores, así que en este capítulo exploraremos estas técnicas de confinamiento en profundidad.

El confinamiento es la sencilla pero poderosa idea de garantizar que la información sólo esté disponible desde un proceso concurrente. Cuando se consigue esto, un programa concurrente es implícitamente seguro y no necesita sincronización. Hay dos tipos de confinamiento posibles: ad hoc y léxico.

El confinamiento ad hoc es cuando consigues el confinamiento mediante una convención, ya sea establecida por la comunidad de lenguajes, el grupo en el que trabajas o la base de código en la que trabajas. En mi opinión, atenerse a una convención es difícil de conseguir en proyectos de cualquier tamaño, a menos que dispongas de herramientas para realizar un análisis estático de tu código cada vez que alguien confirma algo de código. He aquí un ejemplo de confinamiento ad hoc que demuestra por qué:

data := make([]int, 4)

loopData := func(handleData chan<- int) {
    defer close(handleData)
    for i := range data {
        handleData <- data[i]
    }
}

handleData := make(chan int)
go loopData(handleData)

for num := range handleData {
    fmt.Println(num)
}

Podemos ver que la porción de enteros data está disponible tanto desde la función loopData como desde el bucle sobre el canal handleData; sin embargo, por convención, sólo accedemos a ella desde la función loopData. Pero como el código lo tocan muchas personas, y los plazos se acercan, podrían cometerse errores, y el confinamiento podría romperse y causar problemas. Como ya he dicho, una herramienta de análisis estático podría detectar este tipo de problemas, pero el análisis estático en una base de código Go sugiere un nivel de madurez que no muchos equipos alcanzan. Por eso prefiero el confinamiento léxico: utiliza el compilador para imponer el confinamiento.

El confinamiento léxico consiste en utilizar el ámbito léxico para exponer sólo los datos y primitivas de concurrencia correctos para que los utilicen varios procesos concurrentes. Hace que sea imposible hacer lo incorrecto. En realidad, ya hemos tratado este tema en el Capítulo 3. Recuerda la sección sobre canales, en la que se habla de exponer sólo los aspectos de lectura o escritura de un canal a los procesos concurrentes que los necesitan. Volvamos a ver ese ejemplo:

chanOwner := func() <-chan int {
    results := make(chan int, 5) 1
    go func() {
        defer close(results)
        for i := 0; i <= 5; i++ {
            results <- i
        }
    }()
    return results
}

consumer := func(results <-chan int) { 3
    for result := range results {
        fmt.Printf("Received: %d\n", result)
    }
    fmt.Println("Done receiving!")
}

results := chanOwner()        2
consumer(results)
1

Aquí instanciamos el canal dentro del ámbito léxico de la función chanOwner. Esto limita el alcance del aspecto de escritura del canal results al cierre definido debajo de él. En otras palabras, limita el aspecto de escritura de este canal para evitar que otras goroutines escriban en él.

2

Aquí recibimos el aspecto de lectura del canal y podemos pasarlo al consumidor, que no puede hacer otra cosa que leer de él. Una vez más, esto confina la goroutina principal a una vista de sólo lectura del canal.

3

Aquí recibimos una copia de sólo lectura de un canal int. Al declarar que el único uso que requerimos es el acceso de lectura, limitamos el uso del canal dentro de la función consume a sólo lecturas.

Configurado de este modo, es imposible utilizar los canales en este pequeño ejemplo. Es una buena introducción al confinamiento, pero probablemente no sea un ejemplo muy interesante, ya que los canales son seguros para la concurrencia. Veamos un ejemplo de confinamiento que utiliza una estructura de datos que no es segura para la concurrencia, una instancia de bytes.Buffer:

printData := func(wg *sync.WaitGroup, data []byte) {
    defer wg.Done()

    var buff bytes.Buffer
    for _, b := range data {
        fmt.Fprintf(&buff, "%c", b)
    }
    fmt.Println(buff.String())
}

var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3])     1
go printData(&wg, data[3:])     2

wg.Wait()
1

Aquí pasamos un trozo que contiene los tres primeros bytes de la estructura data.

2

Aquí pasamos un trozo que contiene los tres últimos bytes de la estructura data.

En este ejemplo, puedes ver que como printData no se cierra alrededor de la porción data, no puede acceder a ella, y necesita tomar una porción de byte para operar sobre ella. Pasamos diferentes subconjuntos de la porción, limitando así las goroutinas que iniciamos sólo a la parte de la porción que pasamos. Gracias al ámbito léxico, hemos hecho imposible1 hacer lo incorrecto, por lo que no necesitamos sincronizar el acceso a la memoria ni compartir datos mediante comunicación.

Entonces, ¿qué sentido tiene? ¿Por qué perseguir el confinamiento si tenemos a nuestra disposición la sincronización? La respuesta es la mejora del rendimiento y la reducción de la carga cognitiva de los desarrolladores. La sincronización tiene un coste, y si puedes evitarla no tendrás secciones críticas, y por tanto no tendrás que pagar el coste de sincronizarlas. También eludes toda una clase de problemas posibles con la sincronización; los desarrolladores simplemente no tienen que preocuparse por estos problemas. El código concurrente que utiliza el confinamiento léxico también tiene la ventaja de que suele ser más sencillo de entender que el código concurrente sin variables confinadas léxicamente. Esto se debe a que dentro del contexto de tu ámbito léxico puedes escribir código síncrono.

Dicho esto, puede resultar difícil establecer el confinamiento, por lo que a veces tenemos que recurrir a nuestras maravillosas primitivas de concurrencia Go.

El bucle for-select

Algo que verás una y otra vez en los programas Go es el bucle for-select. No es más que algo así

for { // Either loop infinitely or range over something
    select {
    // Do some work with channels
    }
}

Hay un par de escenarios diferentes en los que verás aparecer este patrón.

Enviar variables de iteración por un canal

A menudo querrás convertir algo sobre lo que se puede iterar en valores de un canal. Esto no es nada del otro mundo, y suele tener un aspecto parecido a éste:

for _, s := range []string{"a", "b", "c"} {
    select {
    case <-done:
        return
    case stringStream <- s:
    }
}
En bucle infinito esperando ser detenido

Es muy común crear goroutines que hagan un bucle infinito hasta que se detengan. Hay un par de variantes de ésta. La que elijas es puramente una preferencia estilística.

La primera variante mantiene la declaración select lo más breve posible:

for {
    select {
    case <-done:
        return
    default:
    }

    // Do non-preemptable work
}

Si el canal done no está cerrado, saldremos de la declaración select y continuaremos con el resto del cuerpo de nuestro bucle for.

La segunda variante incluye el trabajo en una cláusula default de la declaración select:

for {
    select {
    case <-done:
        return
    default:
        // Do non-preemptable work
    }
}

Cuando entremos en la declaración select, si el canal done no se ha cerrado, ejecutaremos en su lugar la cláusula default.

Este patrón no tiene nada más, pero aparece por todas partes, por lo que merece la pena mencionarlo.

Prevenir las fugas de goroutine

Como vimos en la sección "Goroutines", sabemos que las goroutines son baratas y fáciles de crear; es una de las cosas que hacen de Go un lenguaje tan productivo. El tiempo de ejecución se encarga de multiplexar las goroutinas en cualquier número de hilos del sistema operativo, de modo que no solemos tener que preocuparnos por ese nivel de abstracción. Pero cuestan recursos, y el tiempo de ejecución no recoge la basura de las goroutines, así que, por pequeña que sea su huella de memoria, no queremos dejarlas tiradas en nuestro proceso. Entonces, ¿cómo nos aseguramos de que se limpian?

Empecemos por el principio y pensemos en esto paso a paso: ¿por qué existiría una gorutina? En el Capítulo 2, establecimos que las gorutinas representan unidades de trabajo que pueden o no ejecutarse en paralelo entre sí. La gorutina tiene varios caminos hacia la terminación:

  • Cuando haya terminado su trabajo.

  • Cuando no puede continuar su trabajo debido a un error irrecuperable.

  • Cuando se le diga que deje de funcionar.

Obtenemos los dos primeros caminos gratis -estos caminos son tu algoritmo-, pero ¿qué pasa con la cancelación del trabajo? Esto resulta ser lo más importante debido al efecto de red: si has iniciado una gorutina, lo más probable es que esté cooperando con otras gorutinas de alguna forma organizada. Podríamos incluso representar esta interconexión como un grafo: si una goroutina hija debe o no seguir ejecutándose podría basarse en el conocimiento del estado de muchas otras goroutinas. La goroutina padre (a menudo la goroutina principal), con este conocimiento contextual completo, debería ser capaz de indicar a sus goroutinas hijas que terminen. Seguiremos estudiando la interdependencia a gran escala de las goroutinas en el próximo capítulo, pero por ahora vamos a considerar cómo asegurar que una sola goroutina hija tenga garantizada la limpieza. Empecemos con un ejemplo sencillo de fuga de una gorutina:

doWork := func(strings <-chan string) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)
        for s := range strings {
            // Do something interesting
            fmt.Println(s)
        }
    }()
    return completed
}

doWork(nil)
// Perhaps more work is done here
fmt.Println("Done.")

Aquí vemos que la goroutina principal pasa un canal nulo a doWork. Por lo tanto, el canal strings en realidad nunca recibirá ninguna cadena escrita en él, y la gorutina que contiene doWork permanecerá en memoria durante toda la vida de este proceso (incluso nos bloquearíamos si uniéramos la gorutina dentro de doWork y la gorutina principal).

En este ejemplo, la vida del proceso es muy corta, pero en un programa real, las goroutinas podrían iniciarse fácilmente al principio de un programa de larga vida. En el peor de los casos, la goroutina principal podría seguir haciendo girar goroutinas a lo largo de su vida, provocando una fluencia en la utilización de la memoria.

La forma de mitigarlo con éxito es establecer una señal entre la gorutina padre y sus hijas que permita a la padre señalar la cancelación a sus hijas. Por convención, esta señal suele ser un canal de sólo lectura llamado done. La goroutina padre pasa este canal a la goroutina hija y luego cierra el canal cuando quiere cancelar la goroutina hija. He aquí un ejemplo:

doWork := func(
  done <-chan interface{},
  strings <-chan string,
) <-chan interface{} { 1
    terminated := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(terminated)
        for {
            select {
            case s := <-strings:
                // Do something interesting
                fmt.Println(s)
            case <-done: 2
                return
            }
        }
    }()
    return terminated
}

done := make(chan interface{})
terminated := doWork(done, nil)

go func() { 3
    // Cancel the operation after 1 second.
    time.Sleep(1 * time.Second)
    fmt.Println("Canceling doWork goroutine...")
    close(done)
}()

<-terminated 4
fmt.Println("Done.")
1

Aquí pasamos el canal done a la función doWork. Por convención, este canal es el primer parámetro.

2

En esta línea vemos el omnipresente patrón for-select en uso. Una de nuestras sentencias case comprueba si nuestro canal done ha recibido la señal. Si es así, volvemos de la goroutine.

3

Aquí creamos otra goroutina que cancelará la goroutina generada en doWork si pasa más de un segundo.

4

Aquí es donde unimos la goroutine generada desde doWork con la goroutine principal.

Y la salida resultante es

Canceling doWork goroutine...
doWork exited.
Done.

Puedes ver que, a pesar de pasar nil para nuestro canal strings, nuestra goroutina sigue saliendo con éxito. A diferencia del ejemplo anterior, en este ejemplo unimos las dos goroutinas, y sin embargo no se produce un bloqueo. Esto se debe a que antes de unir las dos goroutinas, creamos una tercera goroutina para cancelar la goroutina dentro de doWork después de un segundo. ¡Hemos eliminado con éxito la fuga de nuestra gorutina!

El ejemplo anterior maneja bien el caso de las goroutinas que reciben en un canal, pero ¿qué ocurre si nos enfrentamos a la situación inversa: una goroutina bloqueada al intentar escribir un valor en un canal? He aquí un ejemplo rápido para demostrar el problema:

newRandStream := func() <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.") 1
        defer close(randStream)
        for {
            randStream <- rand.Int()
        }
    }()

    return randStream
}

randStream := newRandStream()
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}
1

Aquí imprimimos un mensaje cuando la goroutina termina con éxito.

La ejecución de este código produce:

3 random ints:
1: 5577006791947779410
2: 8674665223082153551
3: 6129484611666145821

Puedes ver en la salida que la sentencia fmt.Println diferida nunca se ejecuta. Después de la tercera iteración de nuestro bucle, nuestra goroutina se bloquea intentando enviar el siguiente entero aleatorio a un canal del que ya no se está leyendo. No tenemos forma de decirle al productor que puede parar. La solución, igual que para el caso de la recepción, es proporcionar a la goroutina productora un canal que le informe de que debe salir:

newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)
        for {
            select {
            case randStream <- rand.Int():
            case <-done:
                return
            }
        }
    }()

    return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}
close(done)

// Simulate ongoing work
time.Sleep(1 * time.Second)

Este código produce:

3 random ints:
1: 5577006791947779410
2: 8674665223082153551
3: 6129484611666145821
newRandStream closure exited.

Ahora vemos que la goroutina se limpia correctamente.

Ahora que sabemos cómo garantizar que las goroutinas no tengan fugas, podemos estipular una convención: Si una goroutina es responsable de crear una goroutina, también es responsable de asegurarse de que puede detener la goroutina.

Esta convención te ayudará a garantizar que tus programas sean componibles y escalables a medida que crecen. Volveremos sobre esta técnica y gobernaremos más en las secciones "Tuberías" y "El paquete contextual". La forma de garantizar que las goroutines puedan detenerse puede variar según el tipo y la finalidad de la goroutine, pero todas parten de la base de pasar un canal done.

El canal or

A veces puede que quieras combinar uno o más canales done en un único canal done que se cierre si se cierra cualquiera de los canales que lo componen. Es perfectamente aceptable, aunque verboso, escribir una sentencia select que realice este acoplamiento; sin embargo, a veces no puedes saber el número de canales done con los que estás trabajando en tiempo de ejecución. En este caso, o si simplemente prefieres una sola línea, puedes combinar estos canales utilizando el patrón o-canal.

Este patrón crea un canal compuesto done mediante recursividad y goroutines. Echemos un vistazo:

var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} { 1
    switch len(channels) {
    case 0: 2
        return nil
    case 1: 3
        return channels[0]
    }

    orDone := make(chan interface{})
    go func() { 4
        defer close(orDone)

        switch len(channels) {
        case 2: 5
            select {
            case <-channels[0]:
            case <-channels[1]:
            }
        default: 6
            select {
            case <-channels[0]:
            case <-channels[1]:
            case <-channels[2]:
            case <-or(append(channels[3:], orDone)...): 6
            }
        }
    }()
    return orDone
}
1

Aquí tenemos nuestra función, or, que toma una porción variádica de canales y devuelve un único canal.

2

Como se trata de una función recursiva, debemos establecer criterios de terminación. El primero es que si la rebanada variádica está vacía, simplemente devolvemos un canal nulo. Esto es coherente con la idea de no pasar canales; no esperaríamos que un canal compuesto hiciera nada.

3

Nuestro segundo criterio de terminación establece que si nuestra rebanada variádica sólo contiene un elemento, sólo devolvemos ese elemento.

4

Aquí está el cuerpo principal de la función, y donde se produce la recursividad. Creamos una goroutine para poder esperar mensajes en nuestros canales sin bloquearnos.

5

Debido a cómo estamos recursando, cada llamada recursiva a or tendrá al menos dos canales. Como optimización para mantener limitado el número de goroutines, colocamos aquí un caso especial para las llamadas a or con sólo dos canales.

6

Aquí creamos recursivamente un canal or a partir de todos los canales de nuestra rebanada después del tercer índice, y luego seleccionamos a partir de él. Esta relación de recurrencia desestructurará el resto de la rebanada en canales or para formar un árbol del que volverá la primera señal. También pasamos el canal orDone para que cuando salgan las goroutines del árbol superior, salgan también las goroutines del árbol inferior.

Se trata de una función bastante concisa que te permite combinar cualquier número de canales en un único canal que se cerrará en cuanto se cierre o se escriba en cualquiera de los canales que lo componen. Veamos cómo podemos utilizar esta función. He aquí un breve ejemplo que toma canales que se cierran tras una duración determinada, y utiliza la función or para combinarlos en un único canal que se cierra:

sig := func(after time.Duration) <-chan interface{}{ 1
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}

start := time.Now() 2
<-or(
    sig(2*time.Hour),
    sig(5*time.Minute),
    sig(1*time.Second),
    sig(1*time.Hour),
    sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start)) 3
1

Esta función simplemente crea un canal que se cerrará cuando transcurra el tiempo especificado en after.

2

Aquí llevamos la cuenta de aproximadamente cuándo empieza a bloquearse el canal de la función or.

3

Y aquí imprimimos el tiempo que tardó en producirse la lectura.

Si ejecutas este programa obtendrás

done after 1.000216772s

Observa que, a pesar de colocar varios canales en nuestra llamada a or que tardan varios tiempos en cerrarse, nuestro canal que se cierra al cabo de un segundo hace que se cierre todo el canal creado por la llamada a or. Esto se debe a que -a pesar de su lugar en el árbol que construye la función or - siempre se cerrará primero y, por lo tanto, los canales que dependen de su cierre también se cerrarán.

Conseguimos esta tersura a costa de goroutines adicionales-f(x)=⌊x/2⌋ donde x es el número de goroutines-pero recuerda que uno de los puntos fuertes de Go es la capacidad de crear, programar y ejecutar goroutines rápidamente, y el lenguaje fomenta activamente el uso de goroutines para modelar problemas correctamente. Preocuparse por el número de goroutines creadas aquí es probablemente una optimización prematura. Además, si en tiempo de compilación no sabes con cuántos canales done estás trabajando, no hay otra forma de combinar canales done.

Este patrón es útil para emplearlo en la intersección de módulos de tu sistema. En estas intersecciones, sueles tener múltiples condiciones para cancelar árboles de goroutines a través de tu pila de llamadas. Utilizando la función or, puedes simplemente combinarlas y pasarlas por la pila. Veremos otra forma de hacerlo en "El paquete de contexto" que también es muy agradable, y quizás un poco más descriptiva.

También veremos cómo podemos utilizar una variación de este patrón para formar un patrón más complicado en "Solicitudes replicadas".

Tratamiento de errores

En los programas concurrentes, la gestión de errores puede ser difícil de hacer bien. A veces, pasamos tanto tiempo pensando en cómo nuestros distintos procesos compartirán información y se coordinarán, que nos olvidamos de considerar cómo manejarán con elegancia los estados de error. Cuando Go rechazó el popular modelo de excepción de errores, hizo una declaración de que la gestión de errores era importante, y que al desarrollar nuestros programas, deberíamos prestar a nuestras rutas de error la misma atención que prestamos a nuestros algoritmos. Con ese espíritu, echemos un vistazo a cómo lo hacemos cuando trabajamos con múltiples procesos concurrentes.

La pregunta más fundamental cuando se piensa en la gestión de errores es: "¿Quién debe ser responsable de gestionar el error?". En algún momento, el programa tiene que dejar de transportar el error por la pila y hacer realmente algo con él. ¿Quién es responsable de ello?

Con procesos concurrentes, esta cuestión se vuelve un poco más compleja. Como un proceso concurrente funciona independientemente de su padre o hermanos, puede resultarle difícil razonar sobre qué es lo correcto hacer con el error. Echa un vistazo al siguiente código para ver un ejemplo de esta cuestión:

checkStatus := func(
  done <-chan interface{},
  urls ...string,
) <-chan *http.Response {
    responses := make(chan *http.Response)
    go func() {
        defer close(responses)
        for _, url := range urls {
            resp, err := http.Get(url)
            if err != nil {
                fmt.Println(err) 1
                continue
            }
            select {
            case <-done:
                return
            case responses <- resp:
            }
        }
    }()
    return responses
}

done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for response := range checkStatus(done, urls...) {
    fmt.Printf("Response: %v\n", response.Status)
}
1

Aquí vemos a la goroutina haciendo todo lo posible para señalar que hay un error. ¿Qué más puede hacer? ¡No puede devolverlo! ¿Cuántos errores son demasiados? ¿Sigue haciendo peticiones?

La ejecución de este código produce:

Response: 200 OK
Get https://badhost: dial tcp: lookup badhost on 127.0.1.1:53: no such host

Aquí vemos que a la goroutina no se le ha dado ninguna opción. No puede tragarse el error, así que hace lo único sensato: imprime el error y espera que alguien le preste atención. No pongas a tus goroutines en esta situación incómoda. Te sugiero que separes tus preocupaciones: en general, tus procesos concurrentes deberían enviar sus errores a otra parte de tu programa que tenga información completa sobre el estado de tu programa, y pueda tomar una decisión más informada sobre qué hacer. El siguiente ejemplo demuestra una solución correcta a este problema:

type Result struct { 1
    Error error
    Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result { 2
    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            result = Result{Error: err, Response: resp} 3
            select {
            case <-done:
                return
            case results <- result: 4
            }
        }
    }()
    return results
}
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil { 5
        fmt.Printf("error: %v", result.Error)
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}
1

Aquí creamos un tipo que engloba tanto el *http.Response como el error posibles de una iteración del bucle dentro de nuestra goroutine.

2

Esta línea devuelve un canal del que se puede leer para recuperar los resultados de una iteración de nuestro bucle.

3

Aquí creamos una instancia de Result con los campos Error y Response definidos.

4

Aquí es donde escribimos el Result a nuestro canal.

5

Aquí, en nuestra gorutina principal, podemos tratar los errores que salen de la gorutina iniciada por checkStatus de forma inteligente, y con el contexto completo del programa más amplio.

Este código produce:

Response: 200 OK
error: Get https://badhost: dial tcp: lookup badhost on 127.0.1.1:53:
no such host

Lo fundamental aquí es cómo hemos acoplado el resultado potencial con el error potencial. Esto representa el conjunto completo de posibles resultados creados a partir de la gorutina checkStatus, y permite a nuestra gorutina principal tomar decisiones sobre qué hacer cuando se producen errores. En términos más generales, hemos separado con éxito las preocupaciones de la gestión de errores de nuestra goroutina productora. Esto es deseable porque la goroutina que generó la goroutina productora -en este caso nuestra goroutina principal- tiene más contexto sobre el programa en ejecución y puede tomar decisiones más inteligentes sobre qué hacer con los errores.

En el ejemplo anterior, simplemente escribimos los errores en stdio, pero podríamos hacer otra cosa. Modifiquemos ligeramente nuestro programa para que deje de intentar comprobar el estado si se producen tres o más errores:

done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v\n", result.Error)
        errCount++
        if errCount >= 3 {
            fmt.Println("Too many errors, breaking!")
            break
        }
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

Este código produce este resultado:

error: Get a: unsupported protocol scheme ""
Response: 200 OK
error: Get b: unsupported protocol scheme ""
error: Get c: unsupported protocol scheme ""
Too many errors, breaking!

Puedes ver que, como los errores se devuelven desde checkStatus y no se gestionan internamente dentro de la gorutina, la gestión de errores sigue el patrón familiar de Go. Éste es un ejemplo sencillo, pero no es difícil imaginar situaciones en las que la goroutina principal coordina los resultados de varias goroutinas y construye reglas más complejas para continuar o cancelar las goroutinas hijas. Una vez más, la principal conclusión es que los errores deben considerarse ciudadanos de primera clase cuando se construyan los valores que devolverán las goroutinas. Si tu gorutina puede producir errores, esos errores deben estar estrechamente vinculados a tu tipo de resultado y transmitirse a través de las mismas líneas de comunicación, igual que las funciones síncronas normales.

Tuberías

Cuando escribes un programa, probablemente no te sientas y escribes una función larga, ¡al menos eso espero! Construyes abstracciones en forma de funciones, estructuras, métodos, etc. ¿Por qué lo hacemos? En parte para abstraer detalles que no importan para el flujo general, y en parte para poder trabajar en un área del código sin afectar a otras áreas. ¿Alguna vez has tenido que hacer un cambio en un sistema y te has visto obligado a tocar varias áreas sólo para hacer un cambio lógico? Puede que sea porque ese sistema adolece de una mala abstracción.

Una canalización no es más que otra herramienta que puedes utilizar para formar una abstracción en tu sistema. En concreto, es una herramienta muy potente para utilizar cuando tu programa necesita procesar flujos, o lotes de datos. Se cree que la palabra pipeline se utilizó por primera vez en 1856, y probablemente se refería a una línea de tuberías que transportaban líquido de un lugar a otro. Tomamos prestado este término en informática porque también transportamos algo de un lugar a otro: datos. Una tubería no es más que una serie de cosas que reciben datos, realizan una operación con ellos y los devuelven. Llamamos a cada una de estas operaciones una etapa de la cadena.

Al utilizar una canalización, separas las preocupaciones de cada etapa, lo que proporciona numerosas ventajas. Puedes modificar las etapas independientemente unas de otras, puedes mezclar y combinar cómo se combinan las etapas independientemente de la modificación de las etapas, puedes procesar cada etapa de forma concurrente con las etapas anteriores o posteriores, y puedes abrir en abanico o limitar la velocidad de partes de tu canalización. Hablaremos de la distribución en abanico en la sección "Distribución en abanico, distribución en abanico", y de la limitación de velocidad en el Capítulo 5. No tienes que preocuparte de lo que significan estos términos ahora mismo; empecemos de forma sencilla e intentemos construir una etapa de la tubería.

Como ya se ha dicho, una etapa no es más que algo que recibe datos, los transforma y los devuelve. He aquí una función que podría considerarse una etapa de la tubería:

multiply := func(values []int, multiplier int) []int {
    multipliedValues := make([]int, len(values))
    for i, v := range values {
        multipliedValues[i] = v * multiplier
    }
    return multipliedValues
}

Esta función toma una porción de enteros con un multiplicador, los recorre multiplicándolos a medida que avanza y devuelve una nueva porción transformada. Parece una función aburrida, ¿verdad? Vamos a crear otra etapa:

add := func(values []int, additive int) []int {
    addedValues := make([]int, len(values))
    for i, v := range values {
        addedValues[i] = v + additive
    }
    return addedValues
}

¡Otra función aburrida! Ésta sólo crea una nueva rebanada y añade un valor a cada elemento. Llegados a este punto, puede que te estés preguntando qué hace que estas dos funciones sean etapas de canalización y no sólo funciones. Probemos a combinarlas:

ints := []int{1, 2, 3, 4}
for _, v := range add(multiply(ints, 2), 1) {
    fmt.Println(v)
}

Este código produce:

3
5
7
9

Fíjate en cómo combinamos add y multiply dentro de la cláusula range. Son funciones como con las que trabajas a diario, pero como las hemos construido para que tengan las propiedades de una etapa de canalización, podemos combinarlas para formar una canalización. Eso es interesante; ¿cuáles son las propiedades de una etapa de canalización?

  • Una etapa consume y devuelve el mismo tipo.

  • Un escenario debe ser cosificado2 por el lenguaje para que se pueda pasar de una a otra. Las funciones en Go están reificadas y se ajustan muy bien a este propósito.

Los que estéis familiarizados con la programación funcional quizá estéis asintiendo con la cabeza y pensando en términos como funciones de orden superior y mónadas. De hecho, las etapas de canalización están muy relacionadas con la programación funcional y pueden considerarse un subconjunto de las mónadas. No voy a entrar aquí explícitamente en las mónadas ni en la programación funcional, pero son temas interesantes por derecho propio, y es útil, aunque innecesario, recurrir al conocimiento práctico de ambos temas cuando se trata de entender los pipelines.

Aquí, nuestras etapas add y multiply satisfacen todas las propiedades de una etapa pipeline: ambas consumen una porción de int y devuelven una porción de int, y como Go tiene funciones reificadas, podemos pasar add y multiple de un lado a otro. Estas propiedades dan lugar a las interesantes propiedades de las etapas pipeline que hemos mencionado antes: a saber, resulta muy fácil combinar nuestras etapas a un nivel superior sin modificar las propias etapas.

Por ejemplo, si ahora quisiéramos añadir una etapa adicional a nuestra tubería para multiplicar por dos, simplemente envolveríamos nuestra tubería anterior en una nueva etapa multiply, así:

ints := []int{1, 2, 3, 4}
for _, v := range multiply(add(multiply(ints, 2), 1), 2) {
    fmt.Println(v)
}

La ejecución de este código produce:

6
10
14
18

Fíjate en que hemos podido hacerlo sin escribir una nueva función, modificar ninguna de las existentes ni modificar lo que hacemos con el resultado de nuestra canalización. Quizá estés empezando a ver las ventajas de utilizar el patrón de canalización. Por supuesto, también podríamos escribir este código procedimentalmente:

ints := []int{1, 2, 3, 4}
for _, v := range ints {
    fmt.Println(2*(v*2+1))
}

Inicialmente, esto parece mucho más sencillo, pero como verás a medida que avancemos, el código procedimental no proporciona las mismas ventajas que una canalización cuando se trata de flujos de datos.

¿Te das cuenta de que cada etapa toma un fragmento de datos y devuelve otro? Estas etapas realizan lo que llamamos procesamiento por lotes. Esto significa que operan con trozos de datos a la vez, en lugar de con un valor discreto cada vez. Hay otro tipo de etapa de canalización que realiza un procesamiento de flujo. Esto significa que la etapa recibe y emite un elemento cada vez.

Hay pros y contras en el procesamiento por lotes frente al procesamiento por flujos, que discutiremos dentro de un momento. Por ahora, fíjate en que para que los datos originales permanezcan inalterados, cada etapa tiene que hacer una nueva rebanada de igual longitud para almacenar los resultados de sus cálculos. Eso significa que la huella de memoria de nuestro programa en un momento dado es el doble del tamaño de la rebanada que enviamos al inicio de nuestro pipeline. Convirtamos nuestras etapas para que estén orientadas al flujo y veamos qué aspecto tiene:

multiply := func(value, multiplier int) int {
    return value * multiplier
}

add := func(value, additive int) int {
    return value + additive
}

ints := []int{1, 2, 3, 4}
for _, v := range ints {
    fmt.Println(multiply(add(multiply(v, 2), 1), 2))
}

Este código produce:

6
10
14
18

Cada etapa recibe y emite un valor discreto, y la huella de memoria de nuestro programa vuelve a ser sólo del tamaño de la entrada de la tubería. Pero tuvimos que arrastrar la tubería al cuerpo del bucle for y dejar que range hiciera el trabajo pesado de alimentar nuestra tubería. Esto no sólo limita la reutilización de cómo alimentamos la tubería, sino que, como veremos más adelante en esta sección, también limita nuestra capacidad de escalado. También tenemos otros problemas. En efecto, estamos instanciando nuestra canalización en cada iteración del bucle. Aunque es barato hacer llamadas a funciones, estamos haciendo tres llamadas a funciones por cada iteración del bucle. ¿Y qué pasa con la concurrencia? Antes dije que una de las ventajas de utilizar canalizaciones era la posibilidad de procesar etapas individuales de forma concurrente, y mencioné algo sobre el fan-out. ¿Dónde entra todo eso?

Probablemente podría ampliar un poco más nuestras funciones multiply y add para introducir estos conceptos, pero ya han cumplido su función de introducir el concepto de canalización. Es hora de empezar a aprender cuáles son las buenas prácticas para construir canalizaciones en Go, y esto empieza con la primitiva canal de Go.

Buenas prácticas para la construcción de tuberías

Los canales son especialmente adecuados para construir canalizaciones en Go porque cumplen todos nuestros requisitos básicos. Pueden recibir y emitir valores, se pueden utilizar de forma concurrente con seguridad, se pueden recorrer y están reificados por el lenguaje. Tomémonos un momento y convirtamos el ejemplo anterior para utilizar canales en su lugar:

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(
  done <-chan interface{},
  intStream <-chan int,
  multiplier int,
) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i*multiplier:
            }
        }
    }()
    return multipliedStream
}

add := func(
  done <-chan interface{},
  intStream <-chan int,
  additive int,
) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i+additive:
            }
        }
    }()
    return addedStream
}

done := make(chan interface{})
defer close(done)

intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
    fmt.Println(v)
}

Este código produce:

6
10
14
18

Parece que hemos replicado el resultado deseado, pero a costa de tener mucho más código. ¿Qué hemos ganado exactamente? En primer lugar, examinemos lo que hemos escrito. Ahora tenemos tres funciones en lugar de dos. Parece que todas ellas inician una gorutina dentro de sus cuerpos, y utilizan el patrón que establecimos en "Evitar las fugas de gorutinas" de tomar un canal para indicar que la gorutina debe salir. Todas parece que devuelven canales, y algunas parece que también toman un canal adicional. ¡Interesante! Empecemos a desglosar esto un poco más:

done := make(chan interface{})
defer close(done)

Lo primero que hace nuestro programa es crear un canal done y llamar a close en una sentencia defer. Como ya hemos dicho, esto garantiza que nuestro programa salga limpiamente y nunca filtre goroutines. Nada nuevo. A continuación, echemos un vistazo a la función generator:

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

// ...

intStream := generator(done, 1, 2, 3, 4)

La función generator toma una porción variádica de enteros, construye un canal de enteros con una longitud igual a la porción de enteros entrante, inicia una gorutina y devuelve el canal construido. A continuación, en la goroutina que se creó, generator recorre la porción variádica que se pasó y envía los valores de las porciones por el canal que creó.

Observa que el envío en el canal comparte una sentencia select con una selección en el canal done. De nuevo, éste es el patrón que establecimos en "Prevención de fug as de goroutinas " para evitar fugas de goroutinas.

En pocas palabras, la función generator convierte un conjunto discreto de valores en un flujo de datos en un canal. Acertadamente, este tipo de función se denomina generador. Lo verás con frecuencia cuando trabajes con canalizaciones, porque al principio de la canalización siempre tendrás algún lote de datos que necesites convertir en un canal. Dentro de un rato veremos algunos ejemplos de generadores divertidos, pero antes terminemos el análisis de este programa. A continuación, construimos nuestra canalización:

pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

Es el mismo proceso con el que hemos estado trabajando todo el tiempo: para un flujo de números, los multiplicaremos por dos, sumaremos uno y multiplicaremos el resultado por dos. Esta cadena es similar a nuestra cadena que utiliza funciones en el ejemplo anterior, pero es diferente en aspectos muy importantes.

En primer lugar, estamos utilizando canales. Esto es obvio pero significativo porque permite dos cosas: al final de nuestro pipeline, podemos utilizar una sentencia range para extraer los valores, y en cada etapa podemos ejecutar de forma concurrente con seguridad porque nuestras entradas y salidas son seguras en contextos concurrentes.

Lo que nos lleva a nuestra segunda diferencia: cada etapa del pipeline se ejecuta concurrentemente. Esto significa que cualquier etapa sólo necesita esperar sus entradas, y poder enviar sus salidas. Esto tiene enormes ramificaciones , como descubriremos en la sección "Fan-Out, Fan-In", pero por ahora podemos limitarnos a señalar que permite que nuestras etapas se ejecuten independientemente unas de otras durante cierto tiempo.

Por último, en nuestro ejemplo, recorremos esta tubería y los valores son arrastrados por el sistema:

for v := range pipeline {
    fmt.Println(v)
}

Aquí tienes una tabla que muestra cómo cada uno de los valores del sistema entrará en cada canal, y cuándo se cerrarán los canales. La iteración es el recuento base-cero de en qué iteración del bucle for estamos, y el valor de cada columna es el valor tal y como entra en la etapa de canalización:

Iteración Generador Multiplica Añade Multiplica Valor

0

1

0

1

0

2

2

0

2

3

0

3

4

6

1

3

5

1

4

6

10

2

(cerrado)

4

7

2

(cerrado)

8

14

3

(cerrado)

9

3

(cerrado)

18

Examinemos también más de cerca nuestro uso del patrón para indicar a las goroutines que salgan. Cuando tratamos con múltiples goroutines interdependientes, ¿cómo acaba funcionando este patrón? ¿Qué pasaría si llamáramos a close en el canal done antes de que el programa terminara de ejecutarse?

Para responder a estas preguntas, echemos un vistazo una vez más a la construcción de nuestra tubería:

pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

Las etapas están interconectadas de dos formas: por el canal común done, y por los canales que se pasan a etapas posteriores de la tubería. En otras palabras, el canal creado por la función multiply se pasa a la función add, y así sucesivamente. Volvamos a la tabla anterior y, antes de permitir que se complete, llamemos a close en el canal done y veamos qué ocurre:

Iteración Generador Multiplica Añade Multiplica Valor

0

1

0

1

0

2

2

0

2

3

1

3

4

6

cerrar(hecho)

(cerrado)

3

5

(cerrado)

6

(cerrado)

7

(cerrado)

(rango de salida)

¿Ves cómo el cierre del canal done se produce en cascada a través de la tubería? Esto es posible gracias a dos cosas en cada etapa de la tubería:

  • Alcance sobre el canal entrante. Cuando se cierre el canal entrante, saldrá el alcance.

  • El envío que comparte una declaración select con el canal done.

Independientemente del estado en que se encuentre la etapa de canalización -esperando en el canal de entrada, o esperando en el canal de envío-, el cierre del canal done forzará la finalización de la etapa de canalización.

Aquí entra en juego una relación de recurrencia. Al principio de la cadena, hemos establecido que debemos convertir valores discretos en un canal. Hay dos puntos en este proceso que deben ser prevenibles:

  • Creación del valor discreto que no es casi instantánea.

  • Envío del valor discreto en su canal.

La primera depende de ti. En nuestro ejemplo, en la función generator, los valores discretos se generan recorriendo la rebanada variádica, lo cual es lo suficientemente instantáneo como para que no necesite ser prevenible. La segunda se gestiona a través de nuestra sentencia select y el canal done, que garantiza que generator sea preferente aunque se bloquee al intentar escribir en intStream.

En el otro extremo de la cadena, la etapa final tiene garantizada la previsibilidad por inducción. Es preferible porque el canal sobre el que nos movemos se cerrará cuando sea preferible, y por tanto nuestro alcance se romperá cuando esto ocurra. La etapa final es preferible porque el flujo del que dependemos es preferible.

Entre el principio y el final de la canalización, el código siempre está recorriendo un canal y enviando por otro canal dentro de una declaración select que contiene un canal done.

Si una etapa se bloquea al recuperar un valor del canal entrante, se desbloqueará cuando ese canal se cierre. Sabemos por inducción que el canal se cerrará porque, o bien es una etapa escrita como la etapa en la que estamos, o bien el comienzo de la tubería que hemos establecido es preferible. Si una etapa se bloquea al enviar un valor, es preemptable gracias a la declaración select.

De este modo, todo nuestro pipeline siempre se puede anticipar cerrando el canal done. Genial, ¿verdad?

Algunos generadores útiles

Antes prometí que hablaría de algunos generadores divertidos que podrían ser de gran utilidad. Como recordatorio, un generador para una canalización es cualquier función que convierte un conjunto de valores discretos en un flujo de valores en un canal. Echemos un vistazo a un generador llamado repeat:

repeat := func(
    done <-chan interface{},
    values ...interface{},
) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

Esta función repetirá infinitamente los valores que le pases hasta que le digas que se detenga. Echemos un vistazo a otra etapa genérica de la tubería que resulta útil cuando se utiliza en combinación con repeat, take:

take := func(
    done <-chan interface{},
    valueStream <-chan interface{},
    num int,
) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <- valueStream:
            }
        }
    }()
    return takeStream
}

Esta etapa de canalización sólo tomará los primeros elementos num de su valueStream entrante y luego saldrá. Juntas, ambas pueden ser muy potentes:

done := make(chan interface{})
defer close(done)

for num := range take(done, repeat(done, 1), 10) {
    fmt.Printf("%v ", num)
}

La ejecución de este código produce:

1 1 1 1 1 1 1 1 1 1

En este ejemplo básico, creamos un generador repeat para generar un número infinito de unos, pero luego sólo tomamos los 10 primeros. Como el envío del generador repeat se bloquea en la recepción de la etapa take, el generador repeat es muy eficiente. Aunque tenemos la capacidad de generar un flujo infinito de unos, sólo generamos N+1 instancias en las que N es el número que pasamos a la etapa take.

Podemos ampliar esto. Vamos a crear otro generador repetitivo, pero esta vez, vamos a crear uno que llame repetidamente a una función. Llamémoslo repeatFn:

repeatFn := func(
    done <-chan interface{},
    fn func() interface{},
) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

Utilicémoslo para generar 10 números aleatorios:

done := make(chan interface{})
defer close(done)

rand := func() interface{} { return rand.Int()}

for num := range take(done, repeatFn(done, rand), 10) {
    fmt.Println(num)
}

Esto produce:

5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
6334824724549167320
605394647632969758
1443635317331776148
894385949183117216
2775422040480279449

Eso está muy bien: ¡un canal infinito de números enteros aleatorios generados según las necesidades!

Quizá te preguntes por qué todos estos generadores y etapas reciben y envían en canales de interface{}. Podríamos haber escrito estas funciones para que fueran específicas de un tipo, o quizá haber escrito un generador Go.

Las interfaces vacías son un poco tabú en Go, pero para las etapas de canalización opino que está bien tratar en canales de interface{} para que puedas utilizar una biblioteca estándar de patrones de canalización. Como hemos dicho antes, gran parte de la utilidad de una tubería proviene de las etapas reutilizables. Esto se consigue mejor cuando las etapas funcionan al nivel de especificidad adecuado para sí mismas. En los generadores repeat y repeatFn, la preocupación es generar un flujo de datos haciendo un bucle sobre una lista o un operador. Con la etapa take, la preocupación es limitar nuestra tubería. Ninguna de estas operaciones requiere información sobre los tipos con los que trabajan, sino que sólo requieren conocer la aridad de sus parámetros.

Cuando necesites tratar con tipos específicos, puedes colocar una etapa que realice la aserción de tipo por ti. La sobrecarga de rendimiento de tener una etapa de canalización adicional (y, por tanto, una gorutina) y la afirmación de tipo es insignificante, como veremos dentro de un momento. He aquí un pequeño ejemplo que introduce una etapa de canalización toString:

toString := func(
    done <-chan interface{},
    valueStream <-chan interface{},
) <-chan string {
    stringStream := make(chan string)
    go func() {
        defer close(stringStream)
        for v := range valueStream {
            select {
            case <-done:
                return
            case stringStream <- v.(string):
            }
        }
    }()
    return stringStream
}

Y un ejemplo de cómo utilizarlo:

done := make(chan interface{})
defer close(done)

var message string
for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
    message += token
}

fmt.Printf("message: %s...", message)

Este código produce:

message: Iam.Iam.I...

Así que vamos a demostrarnos a nosotros mismos que el coste de rendimiento de generar partes de nuestro pipeline es insignificante. Escribiremos dos funciones de evaluación comparativa: una para probar las etapas genéricas y otra para probar las etapas de tipo específico:

func BenchmarkGeneric(b *testing.B) {
    done := make(chan interface{})
    defer close(done)

    b.ResetTimer()
    for range toString(done, take(done, repeat(done, "a"), b.N)) {
    }
}

func BenchmarkTyped(b *testing.B) {
    repeat := func(done <-chan interface{}, values ...string) <-chan string {
        valueStream := make(chan string)
        go func() {
            defer close(valueStream)
            for {
                for _, v := range values {
                    select {
                    case <-done:
                        return
                    case valueStream <- v:
                    }
                }
            }
        }()
        return valueStream
    }

    take := func(
        done <-chan interface{},
        valueStream <-chan string,
        num int,
    ) <-chan string {
        takeStream := make(chan string)
        go func() {
            defer close(takeStream)
            for i := num; i > 0 || i == -1; {
                if i != -1 {
                    i--
                }
                select {
                case <-done:
                    return
                case takeStream <- <-valueStream:
                }
            }
        }()
        return takeStream
    }

    done := make(chan interface{})
    defer close(done)

    b.ResetTimer()
    for range take(done, repeat(done, "a"), b.N) {
    }
}

Y los resultados de ejecutar este código son

BenchmarkGeneric-4

1000000

2266

ns/op

Punto de referenciaTipo-4

1000000

1181

ns/op

PASE

ok

argumentos de la línea de comandos

3.486s

Puedes ver que las etapas específicas de tipo son el doble de rápidas, pero sólo marginalmente más rápidas en magnitud. Generalmente, el factor limitante de tu canalización será tu generador o una de las etapas que sea intensiva en cálculo. Si el generador no está creando un flujo desde la memoria, como ocurre con los generadores repeat y repeatFn, probablemente estarás limitado por la E/S. La lectura desde el disco o la red probablemente eclipse la escasa sobrecarga de rendimiento que se muestra aquí.

Si una de tus etapas es costosa desde el punto de vista computacional, esto sin duda eclipsará esta sobrecarga de rendimiento. Si esta técnica te sigue dejando mal sabor de boca, siempre puedes escribir un generador Go para crear tus etapas generadoras. Hablando de que una etapa sea costosa computacionalmente, ¿cómo podemos ayudar a mitigarlo? ¿No limitará la velocidad de toda la tubería?

Para ayudar a mitigarlo, vamos a hablar de la técnica fan-out, fan-in.

Fan-Out, Fan-In

Así que tienes un pipeline montado. Los datos fluyen por tu sistema maravillosamente, transformándose a medida que avanzan por las etapas que has encadenado. Es como un hermoso arroyo; un hermoso y lento arroyo, y oh Dios mío, ¿por qué tarda tanto?

A veces, las etapas de tu canalización pueden ser especialmente costosas desde el punto de vista computacional. Cuando esto ocurre, las etapas anteriores de tu canalización pueden bloquearse mientras esperan a que se completen tus etapas caras. No sólo eso, sino que la propia cadena puede tardar mucho tiempo en ejecutarse en su conjunto. ¿Cómo podemos solucionar esto?

Una de las propiedades interesantes de las canalizaciones es la capacidad que te dan de operar sobre el flujo de datos utilizando una combinación de etapas separadas, a menudo reordenables. Incluso puedes reutilizar etapas de la canalización varias veces. ¿No sería interesante reutilizar una sola etapa de nuestro pipeline en múltiples goroutines en un intento de paralelizar las extracciones de una etapa anterior? Tal vez eso ayudaría a mejorar el rendimiento de la tubería.

De hecho, resulta que sí puede, y este patrón tiene un nombre: fan-out, fan-in.

Fan-out es un término para describir el proceso de iniciar múltiples goroutines para manejar la entrada del pipeline, y fan-in es un término para describir el proceso de combinar múltiples resultados en un canal.

Entonces, ¿qué hace que una etapa de un pipeline sea adecuada para utilizar este patrón? Podrías considerar la posibilidad de desplegar una de tus etapas si se dan las dos circunstancias siguientes:

  • No se basa en valores que la etapa hubiera calculado antes.

  • Tarda mucho en funcionar.

La propiedad de independencia del orden es importante porque no tienes ninguna garantía de en qué orden se ejecutarán las copias concurrentes de tu etapa, ni en qué orden volverán.

Veamos un ejemplo. En el siguiente ejemplo, he construido una forma muy ineficiente de encontrar primos. Utilizaremos muchas de las etapas que creamos en "Tuberías":

rand := func() interface{} { return rand.Intn(50000000) }

done := make(chan interface{})
defer close(done)

start := time.Now()

randIntStream := toInt(done, repeatFn(done, rand))
fmt.Println("Primes:")
for prime := range take(done, primeFinder(done, randIntStream), 10) {
    fmt.Printf("\t%d\n", prime)
}

fmt.Printf("Search took: %v", time.Since(start))

Aquí tienes los resultados de ejecutar este código:

Primes:
    24941317
    36122539
    6410693
    10128161
    25511527
    2107939
    14004383
    7190363
    45931967
    2393161
Search took: 23.437511647s

Estamos generando un flujo de números aleatorios, con un tope de 50.000.000, convirtiendo el flujo en un flujo de números enteros, y pasándolo a continuación a nuestra etapa primeFinder. primeFinder empieza ingenuamente a intentar dividir el número proporcionado en el flujo de entrada por todos los números inferiores. Si no lo consigue, pasa el valor a la siguiente etapa. Ciertamente, ésta es una forma horrible de intentar encontrar números primos, pero cumple nuestro requisito de tardar mucho tiempo.

En nuestro bucle for, recorremos los números primos encontrados, los imprimimos a medida que van llegando y -gracias a nuestra etapa take - cerramos el canal después de encontrar 10 números primos. A continuación, imprimimos el tiempo que ha durado la búsqueda, y el canal done se cierra mediante una sentencia defer y la tubería se descompone.

Para evitar duplicados en nuestros resultados, podríamos introducir otra etapa en nuestro proceso para almacenar en caché los números primos que se han encontrado en un conjunto, pero por simplicidad, los ignoraremos.

Puedes ver que tardó unos 23 segundos en encontrar 10 primos. No está muy bien. Normalmente, primero miraríamos el algoritmo en sí, quizá cogiendo un libro de cocina de algoritmos, y veríamos si podemos mejorar las cosas en cada etapa. Pero como la finalidad de esta etapa es ser lenta, veremos cómo podemos ampliar una o varias etapas para masticar las operaciones lentas más rápidamente.

Éste es un ejemplo relativamente sencillo, por lo que sólo tenemos dos etapas: la generación de números aleatorios y el cribado de primos. En un programa más grande, tu pipeline podría estar compuesto por muchas más etapas; ¿cómo sabemos cuál de ellas hay que separar? Recuerda nuestros criterios anteriores: independencia del orden y duración. Nuestro generador de números enteros aleatorios es ciertamente independiente del orden, pero no tarda mucho tiempo en ejecutarse. La etapa primeFinder también es independiente del orden -los números son primos o no- y, debido a nuestro algoritmo ingenuo, tarda mucho tiempo en ejecutarse. Parece un buen candidato para el abanico.

Afortunadamente, el proceso de abanicar una etapa en un pipeline es extraordinariamente fácil. Todo lo que tenemos que hacer es iniciar varias versiones de esa etapa. Así, en lugar de esto

primeStream := primeFinder(done, randIntStream)

Podemos hacer algo así

numFinders := runtime.NumCPU()
finders := make([]<-chan int, numFinders)
for i := 0; i < numFinders; i++ {
    finders[i] = primeFinder(done, randIntStream)
}

Aquí estamos arrancando tantas copias de esta etapa como CPUs tengamos. En mi ordenador, runtime.NumCPU() devuelve ocho, así que seguiré utilizando este número en nuestra discusión. En producción, probablemente haríamos algunas pruebas empíricas para determinar el número óptimo de CPUs, pero aquí seguiremos siendo sencillos y supondremos que una CPU se mantendrá ocupada por una sola copia de la etapa findPrimes.

Y ya está. Ahora tenemos ocho goroutines que tiran del generador de números aleatorios e intentan determinar si el número es primo. Generar números aleatorios no debería llevar mucho tiempo, por lo que cada goroutina de la etapa findPrimes debería poder determinar si su número es primo y disponer inmediatamente de otro número aleatorio.

Pero seguimos teniendo un problema: ahora que tenemos cuatro goroutines, también tenemos cuatro canales, pero nuestro rango sobre primos sólo espera un canal. Esto nos lleva a la parte de abanico del patrón.

Como ya hemos dicho antes, abrir en abanico significa multiplexar o unir varios flujos de datos en uno solo. El algoritmo para hacerlo es relativamente sencillo:

fanIn := func(
    done <-chan interface{},
    channels ...<-chan interface{},
) <-chan interface{} { 1
    var wg sync.WaitGroup 2
    multiplexedStream := make(chan interface{})

    multiplex := func(c <-chan interface{}) { 3
        defer wg.Done()
        for i := range c {
            select {
            case <-done:
                return
            case multiplexedStream <- i:
            }
        }
    }

    // Select from all the channels
    wg.Add(len(channels)) 4
    for _, c := range channels {
        go multiplex(c)
    }

    // Wait for all the reads to complete
    go func() { 5
        wg.Wait()
        close(multiplexedStream)
    }()

    return multiplexedStream
}
1

Aquí tomamos nuestro canal estándar done para permitir que nuestras goroutines se derrumben, y luego un trozo variado de canales interface{} para que se abran en abanico.

2

En esta línea creamos un sync.WaitGroup para poder esperar hasta que se hayan vaciado todos los canales.

3

Aquí creamos una función, multiplex, que, cuando se le pasa un canal, leerá del canal, y pasará el valor leído al canal multiplexedStream.

4

Esta línea incrementa el sync.WaitGroup en el número de canales que estamos multiplexando.

5

Aquí creamos una goroutina para esperar a que se vacíen todos los canales que estamos multiplexando para poder cerrar el canal multiplexedStream.

En pocas palabras, abrir en abanico implica crear el canal multiplexado del que leerán los consumidores, y luego poner en marcha una goroutina para cada canal entrante, y una goroutina para cerrar el canal multiplexado cuando se hayan cerrado todos los canales entrantes. Como vamos a crear una goroutina que está esperando a que se completen otras goroutinas en N, tiene sentido crear un sync.WaitGroup para coordinar las cosas. La función multiplex también notifica a la WaitGroup que ha terminado.

Juntemos todo esto y veamos si obtenemos alguna disminución en el tiempo de ejecución:

done := make(chan interface{})
defer close(done)

start := time.Now()

rand := func() interface{} { return rand.Intn(50000000) }

randIntStream := toInt(done, repeatFn(done, rand))

numFinders := runtime.NumCPU()
fmt.Printf("Spinning up %d prime finders.\n", numFinders)
finders := make([]<-chan interface{}, numFinders)
fmt.Println("Primes:")
for i := 0; i < numFinders; i++ {
    finders[i] = primeFinder(done, randIntStream)
}

for prime := range take(done, fanIn(done, finders...), 10) {
    fmt.Printf("\t%d\n", prime)
}

fmt.Printf("Search took: %v", time.Since(start))

Aquí tienes los resultados:

Spinning up 8 prime finders.
Primes:
    6410693
    24941317
    10128161
    36122539
    25511527
    2107939
    14004383
    7190363
    2393161
    45931967
Search took: 5.438491216s

Así que de ~23 segundos a ~5 segundos, ¡no está mal! Esto demuestra claramente las ventajas del patrón fan-out, fan-in, y reitera la utilidad de los pipelines. Hemos reducido nuestro tiempo de ejecución en un ~78% sin alterar drásticamente la estructura de nuestro programa.

El canal or-done

A veces trabajarás con canales de partes dispares de tu sistema. A diferencia de lo que ocurre con las canalizaciones, no puedes hacer afirmaciones sobre cómo se comportará un canal cuando el código con el que estás trabajando se cancele a través de su canal done. Es decir, no sabes si el hecho de que se haya cancelado tu goroutina significa que se habrá cancelado el canal del que estás leyendo. Por esta razón, tal y como expusimos en "Evitar las fugas de goroutinas", tenemos que envolver nuestra lectura del canal con una sentencia select que también seleccione de un canal done. Esto está perfectamente bien, pero hacerlo requiere un código que se lee fácilmente así:

for val := range myChan {
    // Do something with val
}

Y lo explota en esto:

loop:
for {
    select {
    case <-done:
        break loop
    case maybeVal, ok := <-myChan:
        if ok == false {
            return // or maybe break from for
        }
        // Do something with val
    }
}

Esto puede complicarse rápidamente, sobre todo si tienes bucles anidados. Siguiendo con el tema de utilizar goroutines para escribir código concurrente más claro, y no optimizar prematuramente, podemos solucionar esto con una sola goroutine. Encapsulamos la verbosidad para que otros no tengan que hacerlo:

orDone := func(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

Hacer esto nos permite volver a los bucles sencillos de for, así:

for val := range orDone(done, myChan) {
    // Do something with val
}

Puede que encuentres casos de perímetro en tu código en los que necesites un bucle cerrado utilizando una serie de sentencias select, pero te animo a que intentes primero la legibilidad y evites una optimización prematura.

El canal en T

A veces puedes querer dividir los valores que entran por un canal para enviarlos a dos áreas distintas de tu base de código. Imagina un canal de comandos de usuario: puede que quieras recibir un flujo de comandos de usuario en un canal, enviarlos a algo que los ejecute, y también enviarlos a algo que registre los comandos para su posterior auditoría.

Tomando su nombre del comando tee de los sistemas tipo Unix, el canal-tee hace precisamente esto. Puedes pasarle un canal del que leer, y te devolverá dos canales distintos que obtendrán el mismo valor:

tee := func(
    done <-chan interface{},
    in <-chan interface{},
) (_, _ <-chan interface{}) { <-chan interface{}) {
    out1 := make(chan interface{})
    out2 := make(chan interface{})
    go func() {
        defer close(out1)
        defer close(out2)
        for val := range orDone(done, in) {
            var out1, out2 = out1, out2 1
            for i := 0; i < 2; i++ { 2
                select {
                case <-done:
                case out1<-val:
                    out1 = nil 3
                case out2<-val:
                    out2 = nil 3
                }
            }
        }
    }()
    return out1, out2
}
1

Querremos utilizar versiones locales de out1 y out2, por lo que sombreamos estas variables.

2

Vamos a utilizar una sentencia select para que las escrituras en out1 y out2 no se bloqueen entre sí. Para asegurarnos de que se escribe en ambos, realizaremos dos iteraciones de la sentencia select: una para cada canal de salida.

3

Una vez que hemos escrito en un canal, establecemos su copia oculta en nil para que las escrituras posteriores se bloqueen y el otro canal pueda continuar.

Observa que las escrituras en out1 y out2 están estrechamente acopladas. La iteración sobre in no puede continuar hasta que se haya escrito en out1 y out2. Normalmente, esto no es un problema, ya que, de todos modos, gestionar el rendimiento del proceso que lee de cada canal debería ser una preocupación de algo distinto al comando tee, pero merece la pena señalarlo. He aquí un ejemplo rápido para demostrarlo:

done := make(chan interface{})
defer close(done)

out1, out2 := tee(done, take(done, repeat(done, 1, 2), 4))

for val1 := range out1 {
    fmt.Printf("out1: %v, out2: %v\n", val1, <-out2)
}

Utilizando este patrón, es fácil seguir utilizando canales como puntos de unión de tu sistema.

El puente-canal

En algunas circunstancias, puede que quieras consumir valores de una secuencia de canales:

<-chan <-chan interface{}

Esto es ligeramente diferente de unir una serie de canales en un único canal, como vimos en "El canal or" o "Fan-Out, Fan-In". Una secuencia de canales sugiere una escritura ordenada, aunque de distintas fuentes. Un ejemplo podría ser una etapa de la tubería cuya vida es intermitente. Si seguimos las pautas que establecimos en "Confinamiento" y nos aseguramos de que los canales son propiedad de las goroutinas que escriben en ellos, cada vez que se reinicie una etapa del pipeline dentro de una nueva goroutina, se crearía un nuevo canal. Esto significa que tendríamos efectivamente una secuencia de canales. Exploraremos más este escenario en "Curar goroutinas no sanas".

Como consumidor, al código puede no importarle el hecho de que sus valores procedan de una secuencia de canales. En ese caso, tratar con un canal de canales puede resultar engorroso. En cambio, si definimos una función que pueda desestructurar el canal de canales en un canal simple -una técnica llamada puentear los canales-, al consumidor le resultará mucho más fácil centrarse en el problema que tiene entre manos. He aquí cómo podemos conseguirlo:

bridge := func(
    done <-chan interface{},
    chanStream <-chan <-chan interface{},
) <-chan interface{} {
    valStream := make(chan interface{}) 1
    go func() {
        defer close(valStream)
        for { 2
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            for val := range orDone(done, stream) { 3
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}
1

Este es el canal que devolverá todos los valores de bridge.

2

Este bucle se encarga de extraer canales de chanStream y proporcionarlos a un bucle anidado para su uso.

3

Este bucle se encarga de leer los valores del canal que se le ha dado y de repetir esos valores en valStream. Cuando el flujo sobre el que estamos haciendo el bucle se cierra, salimos del bucle que realiza las lecturas de este canal, y continuamos con la siguiente iteración del bucle, seleccionando canales de los que leer. Esto nos proporciona un flujo ininterrumpido de valores.

Este código es bastante sencillo. Ahora podemos utilizar bridge para ayudar a presentar una fachada de un solo canal sobre un canal de canales. He aquí un ejemplo que crea una serie de 10 canales, cada uno con un elemento escrito en ellos, y pasa estos canales a la función bridge:

genVals := func() <-chan <-chan interface{} {
    chanStream := make(chan (<-chan interface{}))
    go func() {
        defer close(chanStream)
        for i := 0; i < 10; i++ {
            stream := make(chan interface{}, 1)
            stream <- i
            close(stream)
            chanStream <- stream
        }
    }()
    return chanStream
}

for v := range bridge(nil, genVals()) {
    fmt.Printf("%v ", v)
}

Ejecutar esto produce:

0 1 2 3 4 5 6 7 8 9

Gracias a bridge, podemos utilizar el canal de canales desde dentro de una única sentencia de rango y centrarnos en la lógica de nuestro bucle. La desestructuración del canal de canales se deja para el código específico de este asunto.

Cola de espera

A veces es útil empezar a aceptar trabajo para tu pipeline aunque éste aún no esté preparado para más. Este proceso se llama poner en cola.

Esto significa que una vez que tu etapa ha completado algún trabajo, lo almacena en una ubicación temporal de la memoria para que otras etapas puedan recuperarlo más tarde, y tu etapa no necesita mantener una referencia a él. En la sección "Canales", hablamos de los canales con búfer, un tipo de cola, pero no hemos hecho mucho uso de ellos desde entonces, y por una buena razón.

Aunque introducir colas en tu sistema es muy útil, suele ser una de las últimas técnicas que quieres emplear al optimizar tu programa. Añadir colas prematuramente puede ocultar problemas de sincronización, como bloqueos muertos y bloqueos vivos, y además, a medida que tu programa converge hacia la corrección, puedes descubrir que necesitas más o menos colas.

Entonces, ¿para qué sirven las colas? Empecemos a responder a esa pregunta abordando uno de los errores más comunes que comete la gente cuando intenta ajustar el rendimiento de un sistema: introducir colas para intentar resolver problemas de rendimiento. Las colas casi nunca acelerarán el tiempo total de ejecución de tu programa; sólo permitirán que el programa se comporte de forma diferente.

Para entender por qué, veamos una tubería sencilla:

done := make(chan interface{})
defer close(done)

zeros := take(done, 3, repeat(done, 0))
short := sleep(done, 1*time.Second, zeros)
long := sleep(done, 4*time.Second, short)
pipeline := long

Este conducto encadena cuatro etapas:

  1. Una etapa de repetición que genera un flujo interminable de 0s.

  2. Una etapa que anula las etapas anteriores después de ver tres objetos.

  3. Una etapa "corta" que duerme un segundo.

  4. Una etapa "larga" que duerme cuatro segundos.

A efectos de este ejemplo, vamos a suponer que las etapas 1 y 2 son instantáneas, y vamos a centrarnos en cómo afectan las etapas que duermen al tiempo de ejecución de la tubería.

Aquí tienes una tabla que examina el tiempo t, la iteración i, y el tiempo que les queda a las etapas larga y corta para pasar a su siguiente valor.

Tiempo(t) i Etapa larga Etapa corta

0

0

1s

1

0

4s

1s

2

0

3s

(bloqueado)

3

0

2s

(bloqueado)

4

0

1s

(bloqueado)

5

1

4s

1s

6

1

3s

(bloqueado)

7

1

2s

(bloqueado)

8

1

1s

(bloqueado)

9

2

4s

(cerrar)

10

2

3s

11

2

2s

12

2

1s

13

3

(cerrar)

Puedes ver que esta tubería tarda aproximadamente 13 segundos en ejecutarse. La etapa corta tarda unos 9 segundos en completarse.

¿Qué ocurre si modificamos la tubería para incluir un búfer? Examinemos la misma tubería con un búfer de 2 introducido entre las etapas larga y corta:

done := make(chan interface{})
defer close(done)

zeros := take(done, 3, repeat(done, 0))
short := sleep(done, 1*time.Second, zeros)
buffer := buffer(done, 2, short)    // Buffers sends from short by 2
long := sleep(done, 4*time.Second, short)
pipeline := long

Aquí tienes el tiempo de ejecución:

Tiempo(t) i Etapa larga Tampón Etapa corta

0

0

0/2

1s

1

0

4s

0/2

1s

2

0

3s

1/2

1s

3

0

2s

2/2

(cerrar)

4

0

1s

2/2

5

1

4s

1/2

6

1

3s

1/2

7

1

2s

1/2

8

1

1s

1/2

9

2

4s

0/2

10

2

3s

0/2

11

2

2s

0/2

12

2

1s

0/2

13

3

(cerrar)

Aun así, ¡el pipeline completo tardó 13 segundos! Pero fíjate en el tiempo de ejecución de la etapa corta. Sólo tarda 3 segundos en completarse, frente a los 9 segundos que tardaba antes. ¡Hemos reducido el tiempo de ejecución de esta etapa en dos tercios! Pero si toda la cadena sigue tardando 13 segundos en ejecutarse, ¿en qué nos ayuda esto?

Imagina en su lugar la siguiente tubería:

p := processRequest(done, acceptConnection(done, httpHandler))

Aquí la canalización no sale hasta que se cancela, y la etapa que está aceptando conexiones no deja de aceptar conexiones hasta que se cancela la canalización. En este caso, no querrás que las conexiones a tu programa empiecen a caducar porque tu etapa processRequest esté bloqueando tu etapa acceptConnection. Quieres que tu etapa acceptConnection esté desbloqueada en la medida de lo posible. De lo contrario, los usuarios de tu programa podrían empezar a ver sus peticiones denegadas por completo.

Así que la respuesta a nuestra pregunta sobre la utilidad de introducir una cola no es que se haya reducido el tiempo de ejecución de una de las etapas, sino que se ha reducido el tiempo que está en estado de bloqueo. Esto permite que la etapa siga haciendo su trabajo. En este ejemplo, los usuarios probablemente experimentarían un retraso en sus solicitudes, pero no se les denegaría el servicio por completo.

De este modo, la verdadera utilidad de las colas es desacoplar etapas, de modo que el tiempo de ejecución de una etapa no repercuta en el tiempo de ejecución de otra. Desacoplar etapas de este modo altera en cascada el comportamiento en tiempo de ejecución del sistema en su conjunto, lo que puede ser bueno o malo según tu sistema.

Llegamos entonces a la cuestión de afinar tus colas. ¿Dónde deben colocarse las colas? ¿Cuál debe ser el tamaño del búfer? Las respuestas a estas preguntas dependen de la naturaleza de tu canalización.

Empecemos analizando las situaciones en las que las colas pueden aumentar el rendimiento general de tu sistema. Las únicas situaciones aplicables son:

  • Si la agrupación de solicitudes en una etapa ahorra tiempo.

  • Si los retrasos en una etapa producen un bucle de realimentación en el sistema.

Un ejemplo de la primera situación es una etapa que almacena la entrada en algo más rápido (por ejemplo, la memoria) de lo que está diseñada para enviar (por ejemplo, el disco). Éste es, por supuesto, todo el propósito del paquete bufio de Go. He aquí un ejemplo que demuestra una simple comparación de una escritura con búfer a una cola frente a una escritura sin búfer:

func BenchmarkUnbufferedWrite(b *testing.B) {
    performWrite(b, tmpFileOrFatal())
}

func BenchmarkBufferedWrite(b *testing.B) {
    bufferredFile := bufio.NewWriter(tmpFileOrFatal())
    performWrite(b, bufio.NewWriter(bufferredFile))
}

func tmpFileOrFatal() *os.File {
    file, err := ioutil.TempFile("", "tmp")
    if err != nil {
        log.Fatal("error: %v", err)
    }
    return file
}

func performWrite(b *testing.B, writer io.Writer) {
    done := make(chan interface{})
    defer close(done)

    b.ResetTimer()
    for bt := range take(done, repeat(done, byte(0)), b.N) {
        writer.Write([]byte{bt.(byte)})
    }
}
go test -bench=. src/concurrency-patterns-in-go/queuing/buffering_test.go

Y aquí están los resultados de ejecutar esta prueba comparativa:

Evaluación comparativaUnbufferedWrite-8

500000

3969

ns/op

PruebaBufferedWrite-8

1000000

1356

ns/op

PASE

ok

argumentos de la línea de comandos

3.398s

Como era de esperar, la escritura con búfer es más rápida que la escritura sin búfer. Esto se debe a que en bufio.Writer, las escrituras se ponen en cola internamente en un búfer hasta que se ha acumulado un trozo suficiente, y entonces se escribe el trozo. Por razones obvias, este proceso se denomina "chunking".

La fragmentación es más rápida porque bytes.Buffer debe hacer crecer su memoria asignada para acomodar los bytes que debe almacenar. Por diversas razones, hacer crecer la memoria es caro; por tanto, cuantas menos veces tengamos que hacerla crecer, más eficiente será el rendimiento de nuestro sistema en su conjunto. Así pues, la cola ha aumentado el rendimiento de nuestro sistema en su conjunto.

Éste es sólo un ejemplo sencillo de chunking en memoria, pero es posible que te encuentres con chunking con frecuencia sobre el terreno. Normalmente, cada vez que realizar una operación requiere una sobrecarga, el chunking puede aumentar el rendimiento del sistema. Algunos ejemplos son la apertura de transacciones de bases de datos, el cálculo de sumas de comprobación de mensajes y la asignación de espacio contiguo.

Aparte del chunking, la cola también puede ayudar si tu algoritmo puede optimizarse mediante el soporte de lookbehinds, u ordenación.

El segundo escenario, en el que un retraso en una etapa provoca más entradas en la tubería, es un poco más difícil de detectar, pero también más importante porque puede provocar un colapso sistémico de tus sistemas ascendentes.

Esta idea suele denominarse bucle de retroalimentación negativa, espiral descendente o incluso espiral de la muerte. Esto se debe a que existe una relación recurrente entre la tubería y sus sistemas ascendentes; el ritmo al que las etapas o sistemas ascendentes envían nuevas solicitudes está vinculado de alguna manera a lo eficiente que es la tubería.

Si la eficacia de la tubería cae por debajo de un determinado umbral crítico, los sistemas anteriores a la tubería empiezan a aumentar sus aportaciones a ésta, lo que hace que la tubería pierda más eficacia, y se inicia la espiral de la muerte. Sin algún tipo de mecanismo de seguridad, el sistema que utiliza la tubería nunca se recuperará.

Introduciendo una cola a la entrada de la canalización, puedes romper el bucle de retroalimentación a costa de crear un retraso para las solicitudes. Desde la perspectiva de la persona que llama al canal, parece que la solicitud se está procesando, pero tarda mucho tiempo. Mientras la persona que llama no agote el tiempo de espera, tu canalización permanecerá estable. Si se agota el tiempo de espera de la persona que llama, tienes que asegurarte de que soportas algún tipo de comprobación de disponibilidad al retirar la solicitud de la cola. Si no lo haces, puedes crear inadvertidamente un bucle de retroalimentación al procesar solicitudes muertas, disminuyendo así la eficiencia de tu canalización.

Así que de nuestros ejemplos podemos empezar a ver que surge un patrón; las colas deben implementarse bien:

  • En la entrada de tu tubería.

  • En las etapas en las que la dosificación conduzca a una mayor eficacia.

Puedes tener la tentación de añadir colas en otro lugar -por ejemplo, después de una etapa computacionalmente costosa- ¡pero evita esa tentación! Como hemos aprendido, sólo hay unas pocas situaciones en las que la puesta en cola reducirá el tiempo de ejecución de tu canalización, y ponerla en un intento de evitarlo puede tener consecuencias desastrosas.

Esto no es intuitivo al principio; para entender por qué, tenemos que hablar del rendimiento de la tubería. No te preocupes, no es tan difícil, y también nos ayudará a responder a la pregunta de cómo determinar lo grandes que deben ser nuestras colas.

En la teoría de colas, existe una ley que -con un muestreo suficiente- predice el rendimiento de tu canalización. Se llama Ley de Little, y sólo necesitas saber unas pocas cosas para entenderla y utilizarla.

Definamos primero algebraicamente la Ley de Little. Se expresa comúnmente como L=λW, donde:

  • L = el número medio de unidades del sistema.

  • λ = la tasa media de llegada de unidades.

  • W = el tiempo medio que pasa una unidad en el sistema.

Esta ecuación sólo se aplica a los llamados sistemas estables. En una tubería, un sistema estable es aquel en el que la velocidad a la que el trabajo entra en la tubería, o entrada, es igual a la velocidad a la que sale del sistema, o salida. Si el ritmo de entrada supera al de salida, tu sistema es inestable y ha entrado en una espiral de muerte. Si la tasa de entrada es inferior a la de salida, sigues teniendo un sistema inestable, pero lo único que ocurre es que tus recursos no se utilizan completamente. No es la peor situación del mundo, pero quizá te preocupe esto si la infrautilización se da a gran escala (por ejemplo, clusters o centros de datos).

Supongamos que nuestra tubería es estable. Si queremos disminuir W, el tiempo medio que pasa una unidad en el sistema en un factor de n, sólo tenemos una opción: disminuir el número medio de unidades en el sistema: L/n = λ * W/n. Y sólo podemos disminuir el número medio de unidades en el sistema si aumentamos la tasa de salida. Observa también que si añadimos colas a nuestras etapas, estamos aumentando L, lo que aumenta la tasa de llegada de unidades (nL = nλ * W) o aumenta el tiempo medio que pasa una unidad en el sistema (nL = λ * nW). Mediante la Ley de Little, hemos demostrado que las colas no ayudarán a disminuir el tiempo de permanencia en el sistema.

Observa también que, puesto que estamos observando nuestra tubería en su conjunto, la reducción de W en un factor de n se distribuye por todas las etapas de nuestra tubería. En nuestro caso, la Ley de Little debería definirse realmente así:

  • L = λΣiWi

Es otra forma de decir que tu pipeline sólo será tan rápido como tu etapa más lenta. ¡Optimiza indiscriminadamente!

¡Así que la Ley de Little es genial! Esta sencilla ecuación abre todo tipo de vías para analizar nuestra tubería. Utilicémosla para plantear algunas preguntas interesantes. Durante nuestro análisis, supongamos que nuestra tubería tiene tres etapas.

Intentemos determinar cuántas solicitudes por segundo puede gestionar nuestro canal. Supongamos que activamos el muestreo en nuestro canal y descubrimos que 1 solicitud (r) tarda aproximadamente 1 segundo en pasar por el canal. Introduzcamos esas cifras.

3r = λr/s * 1s

3r/s = λr/s

λr/s = 3r/s

Establecemos L a 3 porque cada etapa de nuestro pipeline está procesando una solicitud. A continuación, fijamos W en 1 segundo, hacemos un poco de álgebra y ¡voilà! En esta canalización, podemos gestionar tres solicitudes por segundo.

¿Qué pasa con la determinación del tamaño que debe tener nuestra cola para gestionar un número deseado de solicitudes? ¿Puede la Ley de Little ayudarnos a responder a eso?

Supongamos que nuestro muestreo indica que una solicitud tarda 1 ms en procesarse. ¿Qué tamaño tendría que tener nuestra cola para gestionar 100.000 solicitudes por segundo? De nuevo, ¡introduzcamos los números!

Lr-3r = 100,000r/s * 0.0001s

Lr-3r = 10r

Lr = 7r

De nuevo, nuestro pipeline tiene tres etapas, así que disminuiremos L en 3. Establecemos λ en 100.000 r/s, y descubrimos que si queremos atender tantas peticiones, nuestra cola debería tener una capacidad de 7. ¡Recuerda que a medida que aumentas el tamaño de la cola, tu trabajo tarda más en pasar por el sistema! De hecho, estás cambiando utilización del sistema por retraso.

Algo sobre lo que la Ley de Little no puede proporcionar una visión es el manejo del fracaso. Ten en cuenta que si por alguna razón tu canalización entra en pánico, perderás todas las solicitudes de la cola. Esto podría ser algo contra lo que protegerse si volver a crear las solicitudes es difícil o no va a ocurrir. Para mitigarlo, puedes ceñirte a un tamaño de cola de cero, o puedes pasar a una cola persistente, que es simplemente una cola que se mantiene en algún lugar y de la que se puede leer más tarde si surge la necesidad.

Las colas pueden ser útiles en tu sistema, pero debido a su complejidad, suelen ser una de las últimas optimizaciones que sugeriría implementar.

El contexto Paquete

Como hemos visto, en los programas concurrentes a menudo es necesario adelantarse a las operaciones debido a tiempos de espera, cancelaciones o fallos de otra parte del sistema. Hemos estudiado la idea de crear un canal done, que fluye a través de tu programa y cancela todas las operaciones concurrentes bloqueantes. Esto funciona bien, pero también es algo limitado.

Sería útil poder comunicar información adicional junto a la simple notificación de cancelación: por qué se produce la cancelación, o si nuestra función tiene o no un plazo en el que debe finalizar.

Resulta que la necesidad de envolver un canal done con esta información es muy común en sistemas de cualquier tamaño, por lo que los autores de Go decidieron crear un patrón estándar para hacerlo. Empezó como un experimento que vivía fuera de la biblioteca estándar, pero en Go 1.7, el paquete context se incorporó a la biblioteca estándar, convirtiéndolo en un modismo estándar de Go a tener en cuenta cuando se trabaja con código concurrente.

Si echamos un vistazo al paquete context, veremos que es muy sencillo:

var Canceled = errors.New("context canceled")
var DeadlineExceeded error = deadlineExceededError{}

type CancelFunc
type Context

func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

Volveremos sobre estos tipos y funciones dentro de un rato, pero por ahora vamos a centrarnos en el tipo Context. Éste es el tipo que fluirá a través de tu sistema como lo hace un canal done. Si utilizas el paquete context, cada función descendente de tu llamada concurrente de nivel superior recibirá un Context como primer argumento. El tipo tiene el siguiente aspecto:

type Context interface {

    // Deadline returns the time when work done on behalf of this
    // context should be canceled. Deadline returns ok==false when no
    // deadline is set. Successive calls to Deadline return the same
    // results.
    Deadline() (deadline time.Time, ok bool)

    // Done returns a channel that's closed when work done on behalf
    // of this context should be canceled. Done may return nil if this
    // context can never be canceled. Successive calls to Done return
    // the same value.
    Done() <-chan struct{}

    // Err returns a non-nil error value after Done is closed. Err
    // returns Canceled if the context was canceled or
    // DeadlineExceeded if the context's deadline passed. No other
    // values for Err are defined.  After Done is closed, successive
    // calls to Err return the same value.
    Err() error

    // Value returns the value associated with this context for key,
    // or nil if no value is associated with key. Successive calls to
    // Value with the same key returns the same result.
    Value(key interface{}) interface{}
}

Esto también parece bastante sencillo. Hay un método Done que devuelve un canal que se cierra cuando nuestra función va a ser adelantada. También hay algunos métodos nuevos, pero fáciles de entender: una función Deadline para indicar si una goroutina se cancelará al cabo de cierto tiempo, y un método Err que devolverá un valor no nulo si la goroutina se canceló. Pero el método Value parece un poco fuera de lugar. ¿Para qué sirve?

Los autores de Go se dieron cuenta de que uno de los principales usos de las goroutines eran los programas que atendían peticiones. Por lo general, en estos programas es necesario pasar información específica de la solicitud, además de información sobre la espera. Éste es el propósito de la función Value. Hablaremos más de esto dentro de un rato, pero por ahora sólo necesitamos saber que el paquete context sirve para dos propósitos principales:

  • Proporcionar una API para cancelar ramas de tu gráfico de llamadas.

  • Proporcionar una bolsa de datos para transportar datos solicitados a través de tu gráfico de llamadas.

Centrémonos en el primer aspecto: la cancelación.

Como aprendimos en "Evitar las fugas de goroutine", la cancelación en una función tiene tres aspectos:

  • El padre de una goroutine puede querer cancelarla.

  • Una goroutina puede querer cancelar a sus hijos.

  • Cualquier operación de bloqueo dentro de una gorutina debe ser preferible para que pueda cancelarse.

El paquete context ayuda a gestionar estos tres aspectos.

Como hemos dicho, el tipo Context será el primer argumento de tu función. Si observas los métodos de la interfaz Context, verás que no hay nada que pueda mutar el estado de la estructura subyacente. Además, no hay nada que permita a la función que acepta el Context cancelarlo. Esto protege a las funciones que están más arriba en la pila de llamadas de que los hijos cancelen el contexto. Combinado con el método Done, que proporciona un canal done, esto permite al tipo Context gestionar con seguridad la cancelación desde sus antecedentes.

Esto plantea una pregunta: si un Context es inmutable, ¿cómo afectamos al comportamiento de las cancelaciones en funciones situadas por debajo de una función actual en la pila de llamadas?

Aquí es donde cobran importancia las funciones del paquete context. Veamos algunas de ellas una vez más para refrescar la memoria:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

Observa que todas estas funciones reciben un Context y también devuelven uno. Algunas de ellas también reciben otros argumentos, como deadline y timeout. Todas las funciones generan nuevas instancias de un Context con las opciones relativas a estas funciones.

WithCancel devuelve un nuevo Context que cierra su canal done cuando se llama a la función cancel devuelta. WithDeadline devuelve un nuevo Context que cierra su canal done cuando el reloj de la máquina avanza más allá de la duración deadline dada. WithTimeout devuelve un nuevo Context que cierra su canal done después de la duración timeout dada.

Si tu función necesita cancelar de algún modo funciones que están por debajo de ella en el grafo de llamadas, llamará a una de estas funciones y le pasará el Context que se le dio, y luego pasará el Context devuelto a sus hijos. Si tu función no necesita modificar el comportamiento de cancelación, la función simplemente pasa el Context que se le dio.

De este modo, las capas sucesivas del grafo de llamada pueden crear un Context que se ajuste a sus necesidades sin afectar a sus padres. Esto proporciona una solución muy componible y elegante para gestionar las ramas de tu gráfico de llamadas.

En este sentido, las instancias de un Context están pensadas para fluir a través del gráfico de llamadas de tu programa. En un paradigma orientado a objetos, es habitual almacenar referencias a datos utilizados a menudo como variables miembro, pero es importante no hacerlo con instancias de . context.Context. Las instancias de context.Context pueden parecer equivalentes desde fuera, pero internamente pueden cambiar en cada apilamiento. Por esta razón, es importante pasar siempre instancias de Context a tus funciones. De este modo, las funciones tienen el Context previsto para ella, y no el Context previsto para un stack-frame N niveles por encima de la pila.

En la parte superior de tu gráfico de llamadas asíncronas, es probable que a tu código no se le haya pasado un Context. Para iniciar la cadena, el paquete context te proporciona dos funciones para crear instancias vacías de Context:

func Background() Context
func TODO() Context

Background simplemente devuelve un Context vacío. TODO no está pensado para su uso en producción, pero también devuelve un Context vacío; TODOestá pensado para servir de marcador de posición cuando no sepas qué Context utilizar, o si esperas que tu código disponga de un Context, pero el código ascendente aún no lo ha proporcionado.

Así que pongamos todo esto en práctica. Veamos un ejemplo que utiliza el patrón de canal done, y veamos qué ventajas podemos obtener si pasamos a utilizar el paquete context. He aquí un programa que imprime simultáneamente un saludo y una despedida:

func main() {
    var wg sync.WaitGroup
    done := make(chan interface{})
    defer close(done)

    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printGreeting(done); err != nil {
            fmt.Printf("%v", err)
            return
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printFarewell(done); err != nil {
            fmt.Printf("%v", err)
            return
        }
    }()

    wg.Wait()
}

func printGreeting(done <-chan interface{}) error {
    greeting, err := genGreeting(done)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", greeting)
    return nil
}

func printFarewell(done <-chan interface{}) error {
    farewell, err := genFarewell(done)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", farewell)
    return nil
}

func genGreeting(done <-chan interface{}) (string, error) {
    switch locale, err := locale(done); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "hello", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func genFarewell(done <-chan interface{}) (string, error) {
    switch locale, err := locale(done); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "goodbye", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func locale(done <-chan interface{}) (string, error) {
    select {
    case <-done:
        return "", fmt.Errorf("canceled")
    case <-time.After(1*time.Minute):
    }
    return "EN/US", nil
}

La ejecución de este código produce:

goodbye world!
hello world!

Ignorando la condición de carrera (¡podríamos recibir nuestra despedida antes de ser saludados!), podemos ver que tenemos dos ramas de nuestro programa ejecutándose concurrentemente. Hemos establecido el método de anticipación estándar creando un canal done y pasándolo a través de nuestro gráfico de llamadas. Si cerramos el canal done en cualquier punto de main, ambas ramas se cancelarán.

Al introducir goroutines en main, hemos abierto la posibilidad de controlar este programa de varias formas diferentes e interesantes. Quizá queramos que genGreeting agote el tiempo de espera si tarda demasiado. Quizá no queramos que genFarewell invoque a locale si sabemos que su padre se va a cancelar pronto. En cada marco de pila, una función puede afectar a la totalidad de la pila de llamadas que tiene debajo.

Utilizando el patrón del canal done, podríamos conseguirlo envolviendo el canal done entrante en otros canales done y devolviendo si alguno de ellos se dispara, pero no tendríamos la información extra sobre plazos y errores que nos da un Context.

Para que sea más fácil comparar el patrón del canal done con el uso del paquete context, vamos a representar este programa como un árbol. Cada nodo del árbol representa una invocación a una función.

cigo 04in01

Modifiquemos nuestro programa para utilizar el paquete context en lugar de un canal done. Como ahora tenemos la flexibilidad de un context.Context, podemos introducir un escenario divertido.

Digamos que genGreeting sólo quiere esperar un segundo antes de abandonar la llamada a locale-un tiempo de espera de un segundo. También queremos incorporar cierta lógica inteligente en main. Si printGreeting no tiene éxito, también queremos cancelar nuestra llamada a printFarewell. Al fin y al cabo, ¡no tendría sentido decir adiós si no decimos hola!

Implementar esto con el paquete context es trivial:

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background()) 1
    defer cancel()

    wg.Add(1)
    go func() {
        defer wg.Done()

        if err := printGreeting(ctx); err != nil {
            fmt.Printf("cannot print greeting: %v\n", err)
            cancel() 2
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printFarewell(ctx); err != nil {
            fmt.Printf("cannot print farewell: %v\n", err)
        }
    }()

    wg.Wait()
}

func printGreeting(ctx context.Context) error {
    greeting, err := genGreeting(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", greeting)
    return nil
}

func printFarewell(ctx context.Context) error {
    farewell, err := genFarewell(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", farewell)
    return nil
}

func genGreeting(ctx context.Context) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, 1*time.Second) 3
    defer cancel()

    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "hello", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func genFarewell(ctx context.Context) (string, error) {
    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "goodbye", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func locale(ctx context.Context) (string, error) {
    select {
    case <-ctx.Done():
        return "", ctx.Err() 4
    case <-time.After(1 * time.Minute):
    }
    return "EN/US", nil
}
1

Aquí main crea un nuevo Context con context.Background() y lo envuelve con context.WithCancel para permitir cancelaciones.

2

En esta línea, main cancelará la Context si se produce un error devuelto por printGreeting.

3

Aquí genGreeting envuelve su Context con context.WithTimeout. Esto cancelará automáticamente el Context devuelto después de 1 segundo, cancelando así cualquier hijo al que pase el Context, es decir, locale.

4

Esta línea devuelve la razón por la que se canceló Context. Este error llegará hasta main, lo que provocará la cancelación en 2.

Aquí tienes los resultados de ejecutar este código:

cannot print greeting: context deadline exceeded
cannot print farewell: context canceled

Utilicemos nuestro gráfico de llamadas para entender lo que está pasando. Los números que aparecen aquí corresponden a las llamadas de código del ejemplo anterior.

cigo 04in02

Podemos ver en nuestra salida que el sistema funciona perfectamente. Como nos aseguramos de que locale tarda al menos un minuto en ejecutarse, nuestra llamada en genGreeting siempre agotará el tiempo de espera, lo que significa que main siempre cancelará la llamada-gráfica debajo de printFarewell.

Fíjate en cómo genGreeting pudo construir un context.Context personalizado para satisfacer sus necesidades sin tener que afectar al Context de su padre. Si genGreeting volviera con éxito, y printGreeting necesitara hacer otra llamada, podría hacerlo sin filtrar información sobre cómo funcionaba genGreeting. Esta componibilidad te permite escribir grandes sistemas sin mezclar preocupaciones a lo largo de tu gráfico de llamadas.

Podemos introducir otra mejora en este programa: como sabemos que locale tarda aproximadamente un minuto en ejecutarse, en locale podemos comprobar si nos han dado un plazo y, en caso afirmativo, si lo cumpliremos. Este ejemplo muestra cómo utilizar el método context.ContextDeadline para ello:

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    wg.Add(1)
    go func() {
        defer wg.Done()

        if err := printGreeting(ctx); err != nil {
            fmt.Printf("cannot print greeting: %v\n", err)
            cancel()
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        if err := printFarewell(ctx); err != nil {
            fmt.Printf("cannot print farewell: %v\n", err)
        }
    }()

    wg.Wait()
}

func printGreeting(ctx context.Context) error {
    greeting, err := genGreeting(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", greeting)
    return nil
}

func printFarewell(ctx context.Context) error {
    farewell, err := genFarewell(ctx)
    if err != nil {
        return err
    }
    fmt.Printf("%s world!\n", farewell)
    return nil
}

func genGreeting(ctx context.Context) (string, error) {
    ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
    defer cancel()

    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "hello", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func genFarewell(ctx context.Context) (string, error) {
    switch locale, err := locale(ctx); {
    case err != nil:
        return "", err
    case locale == "EN/US":
        return "goodbye", nil
    }
    return "", fmt.Errorf("unsupported locale")
}

func locale(ctx context.Context) (string, error) {
    if deadline, ok := ctx.Deadline(); ok { 1
        if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
            return "", context.DeadlineExceeded
        }
    }

    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-time.After(1 * time.Minute):
    }
    return "EN/US", nil
}
1

Aquí comprobamos si nuestro Context ha proporcionado una fecha límite. Si lo ha hecho, y el reloj de nuestro sistema ha avanzado más allá de la fecha límite, simplemente volvemos con un error especial definido en el paquete context, DeadlineExceeded.

Aunque la diferencia en esta iteración del programa es pequeña, permite que la función locale falle rápidamente. En programas que pueden tener un coste elevado por llamar a la siguiente parte de la funcionalidad, esto puede ahorrar una cantidad de tiempo significativa, pero al menos también permite que la función falle inmediatamente en lugar de tener que esperar a que se produzca el tiempo de espera real. La única pega es que tienes que tener alguna idea de cuánto tiempo tardará tu gráfico de llamada subordinado, un ejercicio que puede resultar muy difícil.

Esto nos lleva a la otra mitad de lo que proporciona el paquete context: una bolsa de datos para que Context almacene y recupere datos de solicitudes. Recuerda que, a menudo, cuando una función crea una goroutine y Context, está iniciando un proceso que atenderá peticiones, y las funciones que están más abajo en la pila pueden necesitar información sobre la petición. He aquí un ejemplo de cómo almacenar datos dentro de Context, y cómo recuperarlos:

func main() {
    ProcessRequest("jane", "abc123")
}

func ProcessRequest(userID, authToken string) {
    ctx := context.WithValue(context.Background(), "userID", userID)
    ctx = context.WithValue(ctx, "authToken", authToken)
    HandleResponse(ctx)
}

func HandleResponse(ctx context.Context) {
    fmt.Printf(
        "handling response for %v (%v)",
        ctx.Value("userID"),
        ctx.Value("authToken"),
    )
}

Esto produce:

handling response for jane (abc123)

Es bastante sencillo. Los únicos requisitos son

  • La clave que utilices debe satisfacer la noción de comparabilidad de Go; es decir, los operadores de igualdad == y != deben devolver resultados correctos cuando se utilicen.

  • Los valores devueltos deben ser de acceso seguro desde múltiples goroutines.

Puesto que tanto la clave como el valor de Contextse definen como interface{}, perdemos la seguridad de tipos de Go al intentar recuperar valores. La clave podría ser de un tipo distinto, o ligeramente distinto, de la clave que proporcionamos. El valor podría ser de un tipo distinto al que esperamos. Por estas razones, los autores de Go recomiendan que sigas algunas reglas al almacenar y recuperar valores de un Context.

En primer lugar, recomiendan que definas un tipo de clave personalizado en tu paquete. Siempre que otros paquetes hagan lo mismo, esto evitará colisiones dentro de Context. Como recordatorio de por qué, echemos un vistazo a un breve programa que intenta almacenar claves en un mapa que tienen tipos diferentes, pero el mismo valor subyacente:

type foo int
type bar int

m := make(map[interface{}]int)
m[foo(1)] = 1
m[bar(1)] = 2

fmt.Printf("%v", m)

Esto produce:

map[1:1 1:2]

Puedes ver que, aunque los valores subyacentes son los mismos, la información de tipo diferente los diferencia dentro de un mapa. Puesto que el tipo que definas para las claves de tu paquete no se puede exportar, otros paquetes no pueden entrar en conflicto con las claves que generes dentro de tu paquete.

Como no exportamos las claves que utilizamos para almacenar los datos, debemos exportar funciones que los recuperen por nosotros. Esto funciona muy bien, ya que permite a los consumidores de estos datos utilizar funciones estáticas y seguras.

Cuando juntas todo esto, obtienes algo como el siguiente ejemplo:

func main() {
    ProcessRequest("jane", "abc123")
}

type ctxKey int

const (
    ctxUserID ctxKey = iota
    ctxAuthToken
)

func UserID(c context.Context) string {
    return c.Value(ctxUserID).(string)
}

func AuthToken(c context.Context) string {
    return c.Value(ctxAuthToken).(string)
}

func ProcessRequest(userID, authToken string) {
    ctx := context.WithValue(context.Background(), ctxUserID, userID)
    ctx = context.WithValue(ctx, ctxAuthToken, authToken)
    HandleResponse(ctx)
}

func HandleResponse(ctx context.Context) {
    fmt.Printf(
        "handling response for %v (auth: %v)",
        UserID(ctx),
        AuthToken(ctx),
    )
}

La ejecución de este código produce:

handling response for jane (auth: abc123)

Ahora tenemos una forma segura de recuperar valores del Context, y -si los consumidores estuvieran en un paquete diferente- no sabrían ni les importaría qué claves se utilizaron para almacenar la información. Sin embargo, esta técnica plantea un problema.

En el ejemplo anterior, supongamos que HandleResponse vivía en otro paquete llamado response, y supongamos que el paquete ProcessRequest vivía en un paquete llamado process. El paquete process tendría que importar el paquete response para hacer la llamada a HandleResponse, pero HandleResponse no tendría forma de acceder a las funciones accesorias definidas en el paquete process porque importar process formaría una dependencia circular. Como los tipos utilizados para almacenar las claves en Context son privados del paquete process, ¡el paquete response no tiene forma de recuperar estos datos!

Esto obliga a la arquitectura a crear paquetes centrados en tipos de datos que se importan de varios lugares. Esto no es malo, pero hay que tenerlo en cuenta.

El paquete context es bastante bueno, pero no ha sido uniformemente alabado. Dentro de la comunidad Go, el paquete context ha sido algo controvertido. El aspecto de cancelación del paquete ha sido bastante bien recibido, pero la posibilidad de almacenar datos arbitrarios en Context, y la forma insegura de almacenar los datos, han causado cierta división. Aunque hemos paliado en parte la falta de seguridad de tipos con nuestras funciones de acceso, aún podríamos introducir errores almacenando tipos incorrectos. Sin embargo, la cuestión más importante es, sin duda, la naturaleza de lo que los desarrolladores deben almacenar en las instancias de Context.

La orientación más extendida sobre lo que es apropiado es este comentario algo ambiguo del paquete context:

  Use context values only for request-scoped data that transits processes and
  API boundaries, not for passing optional parameters to functions.

Está bastante claro lo que es un parámetro opcional (no deberías utilizar un Context para satisfacer tu deseo secreto de que Go admita parámetros opcionales), pero ¿qué son los "datos de ámbito de solicitud"? Supuestamente "atraviesan los procesos y los límites de la API", pero eso podría describir muchas cosas. La mejor forma que he encontrado de definirlo es idear algunas heurísticas con tu equipo, y evaluarlas en las revisiones del código. He aquí mi heurística:

1) Los datos deben traspasar los límites del proceso o de la API.

Si generas los datos en la memoria de tu proceso, probablemente no sean buenos candidatos para ser datos de ámbito de petición, a menos que también los pases a través de un límite de API.

2) Los datos deben ser inmutables.

Si no lo es, entonces, por definición, lo que estás almacenando no procede de la solicitud.

3) Los datos deben tender hacia los tipos simples.

Si se pretende que los datos de las solicitudes transiten por los límites del proceso y de la API, es mucho más fácil para la otra parte extraer estos datos si no tiene que importar también un complejo gráfico de paquetes.

4) Los datos deben ser datos, no tipos con métodos.

Las operaciones son lógicas y pertenecen a las cosas que consumen estos datos.

5) Los datos deben ayudar a decorar las operaciones, no dirigirlas.

Si tu algoritmo se comporta de forma diferente en función de lo que se incluya o no en su Contextes probable que te hayas adentrado en el territorio de los parámetros opcionales.

No son reglas rígidas, sino heurísticas. Sin embargo, si descubres que los datos que almacenas en Context infringen estas cinco directrices, es posible que quieras revisar detenidamente lo que estás haciendo.

Otra dimensión a considerar es cuántas capas pueden tener que atravesar estos datos antes de su utilización. Si hay unos cuantos frameworks y decenas de funciones entre el lugar donde se aceptan los datos y el lugar donde se utilizan, ¿quieres inclinarte por firmas de funciones verbosas y autodocumentadas, y añadir los datos como parámetro? ¿O prefieres colocarlos en un Context y crear así una dependencia invisible? Cada enfoque tiene sus ventajas, y al final es una decisión que tú y tu equipo tendréis que tomar.

Incluso con estas heurísticas, saber si un valor es o no un dato de ámbito de solicitud sigue siendo una pregunta difícil de responder. Echa un vistazo a la siguiente tabla. En ella se recogen mis opiniones sobre si cada tipo de dato cumple o no las cinco heurísticas que he enumerado. ¿Estás de acuerdo?

Datos 1 2 3 4 5

Solicitar ID

ID de usuario

URL

Conexión al servidor API

Token de autorización

Solicitar Token

A veces está claro que algo no debe almacenarse en un contexto, como ocurre con las conexiones al servidor API, pero a veces no está tan claro. ¿Qué pasa con un token de autorización? Es inmutable, y probablemente sea un trozo de bytes, pero ¿los receptores de estos datos no los utilizarán para determinar si deben dar curso a la solicitud? ¿Pertenecen estos datos a un contexto? Para enturbiar aún más las aguas, lo que es aceptable en un equipo puede no serlo en otro.

En última instancia, aquí no hay respuestas fáciles. El paquete se ha incorporado a la biblioteca estándar, por lo que debes formarte una opinión sobre su uso, pero esa opinión podría (y probablemente debería) cambiar en función del proyecto que estés tocando. El consejo final que te dejo es que la funcionalidad de cancelación que proporciona Context es muy útil, y tus sentimientos sobre la bolsa de datos no deberían disuadirte de utilizarla.

Resumen

Hemos cubierto mucho terreno en este capítulo. Hemos combinado las primitivas de concurrencia de Go para formar patrones que ayudan a escribir código concurrente mantenible. Ahora que estás familiarizado con estos patrones, podemos hablar de cómo podemos incorporarlos a otros patrones que te ayudarán a escribir grandes sistemas. El próximo capítulo te dará una visión general de las técnicas para hacer precisamente eso.

1 Estoy ignorando la posibilidad de manipular manualmente la memoria mediante el paquete unsafe. ¡Por algo se llama unsafe!

2 En el contexto de los lenguajes, la reificación significa que el lenguaje expone un concepto a los desarrolladores para que puedan trabajar con él directamente. Se dice que las funciones en Go están reificadas porque puedes definir variables que tengan el tipo de una firma de función. Esto también significa que puedes pasar funciones por tu programa.

Get Concurrencia en Go now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.