¿Cómo definirías un grupo de goroutines para ser ejecutados de inmediato en Golang?


TL;TR: Por favor, solo ve a la última parte y dime cómo resolverías este problema.

He empezado a usar Golang esta mañana viniendo de Python. Quiero llamar a un ejecutable de código cerrado desde Go varias veces, con un bit de concurrencia, con diferentes argumentos de la línea de comandos. Mi código resultante está funcionando bien, pero me gustaría obtener su entrada con el fin de mejorarlo. Como estoy en una etapa temprana de aprendizaje, también explicaré mi flujo de trabajo.

Por el bien por simplicidad, supongamos aquí que este" programa externo de código cerrado " es zenity, una herramienta de línea de comandos de Linux que puede mostrar cuadros de mensajes gráficos desde la línea de comandos.

Llamando a un archivo ejecutable desde Go

Así que, en Go, yo iría así: {[18]]}

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}

Esto debería estar funcionando bien. Tenga en cuenta que .Run() es un equivalente funcional de .Start() seguido de .Wait(). Esto es genial, pero si quisiera ejecutar este programa solo una vez, todo el material de programación no sería pena. Así que vamos a hacerlo varias veces.

Llamando a un ejecutable varias veces

Ahora que tengo esto funcionando, me gustaría llamar a mi programa varias veces, con argumentos de línea de comandos personalizados (aquí solo i en aras de la simplicidad).

package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}

Ok, lo hicimos! Pero todavía no puedo ver la ventaja de ir sobre Python This Este fragmento de código se ejecuta en serie. Tengo una CPU de varios núcleos y me gustaría aprovecharla. Así que vamos a añadir cierta concurrencia con goroutines.

Goroutines, o una manera de hacer mi programa paralelo

A) Primer intento: simplemente agregue "go"s en todas partes

Reescribamos nuestro código para hacer las cosas más fáciles de llamar y reutilizar y agreguemos la famosa palabra clave go:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

Nada! ¿Cuál es el problema? Todos los goroutines son ejecutados a la vez. Realmente no sé por qué zenity no se ejecuta, pero AFAIK, el programa Go salió antes de que el programa externo zenity pudiera incluso ser inicializado. Esto fue confirmado por el uso de time.Sleep: esperar un par de segundos fue suficiente para permitir que la instancia 8 de zenity se lanzara. Aunque no se si esto puede ser considerado un error.

Para empeorar las cosas, el programa real al que me gustaría llamar tarda un tiempo en ejecutarse. Si ejecuto8 instancias de este programa en paralelo en mi CPU de 4 núcleos, va a perder algún tiempo haciendo un montón de cambio de contexto't No se cómo se comportan los goroutines, pero exec.Command lanzará zenity 8 veces en 8 hilos diferentes. Para empeorar las cosas, quiero ejecutar este programa más de 100.000 veces. Hacer todo eso a la vez en goroutines no será eficiente en absoluto. Aún así, me gustaría aprovechar mi CPU de 4 núcleos!

B) Segundo intento: usar charcos de goroutinas

Los recursos en línea tienden a recomendar el uso de sync.WaitGroup para este tipo de trabajo. El problema con ese enfoque es que básicamente estás trabajando con lotes de goroutines: si crear de WaitGroup de 4 miembros, el programa Go esperará a que todos los 4 programas externos terminen antes de llamar a un nuevo lote de 4 programas. Esto no es eficiente: la CPU se desperdicia, una vez más.

Algunos otros recursos recomendaron el uso de un canal buffered para hacer el trabajo:

package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

Esto parece feo. Los canales no estaban destinados para este propósito: estoy explotando un efecto secundario. Me encanta el concepto de defer pero odio tener que declarar una función (incluso una lambda) para saca un valor del canal ficticio que creé. Y por supuesto, usar un canal falso es, por sí mismo, feo.

C) Tercer intento: morir cuando todos los niños están muertos

Ahora estamos casi terminados. Solo tengo que tener en cuenta otro efecto secundario: el programa Go se cierra antes de que todas las ventanas emergentes zenity se cierren. Esto se debe a que cuando se finaliza el bucle (en la 8ª iteración), nada impide que el programa termine. Esta vez, sync.WaitGroup será utilidad.

package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}

Hecho.

Mis preguntas

  • ¿Conoce alguna otra forma adecuada de limitar el número de goroutines ejecutados a la vez?

No me refiero a hilos; cómo Go administra goroutines internamente no es relevante. Realmente me refiero a limitar el número de goroutines lanzados a la vez: exec.Command crea un nuevo hilo cada vez que se llama, por lo que debería controlar el número de veces que se llama.

  • ¿Ese código se ve bien para ¿tú?
  • ¿Sabes cómo evitar el uso de un canal ficticio en ese caso?

No puedo convencerme de que tales canales ficticios son el camino a seguir.

Author: MaxVT, 2013-08-23

3 answers

Generaría 4 goroutines de trabajadores que leen las tareas desde un canal común. Las goroutinas que son más rápidas que otras (porque están programadas de manera diferente o porque reciben tareas simples) recibirán más tareas de este canal que otras. Además de eso, usaría una sincronización .WaitGroup para esperar a que todos los trabajadores terminen. La parte restante es solo la creación de las tareas. Puede ver un ejemplo de implementación de ese enfoque aquí:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}

Probablemente hay otros enfoques posibles, pero creo que esta es una solución muy limpia y fácil de entender.

 83
Author: tux21b,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-06-23 18:00:13

Un enfoque simple para la limitación (ejecutar f() N veces pero máximo maxConcurrency simultáneamente), solo un esquema:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}

Parque infantil

Probablemente no llamaría al canal throttle "maniquí". En mi humilde opinión es una manera elegante (no es mi invención, por supuesto), cómo limitar la concurrencia.

Por cierto: Tenga en cuenta que está ignorando el error devuelto de cmd.Run().

 31
Author: zzzz,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-08-23 14:33:57

Prueba esto: https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
        zenity(...) 
 })
 limiter.Wait()
 1
Author: korovkin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-13 18:39:32