Cómo pasar una referencia de cola a una función administrada por pool.map async()?


Quiero que un proceso de larga duración devuelva su progreso a través de una Cola (o algo similar) que alimentaré a un diálogo de barra de progreso. También necesito el resultado cuando se complete el proceso. Un ejemplo de prueba aquí falla con un RuntimeError: Queue objects should only be shared between processes through inheritance.

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"

def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

He sido capaz de hacer que esto funcione usando objetos de proceso individuales (donde estoy permitido pasar una referencia de cola) pero entonces no tengo un grupo para administrar los muchos procesos que quiero lanzar. Cualquier consejo sobre un mejor patrón para ¿esto?

Author: tburrows13, 2010-07-10

2 answers

El siguiente código parece funcionar:

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Tenga en cuenta que la cola se obtiene de un administrador.Queue () en lugar de multiprocesamiento.Cola(). Gracias Alex por señalarme en esta dirección.

 43
Author: David,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2010-07-12 20:30:58

Haciendo q global funciona...:

import multiprocessing, time

q = multiprocessing.Queue()

def task(count):
    for i in xrange(count):
        q.put("%d mississippi" % i)
    return "Done"

def main():
    pool = multiprocessing.Pool()
    result = pool.map_async(task, range(10))
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Si necesita varias colas, por ejemplo, para evitar mezclar el progreso de varios procesos de pool, debería funcionar una lista global de colas (por supuesto, cada proceso necesitará saber qué index en la lista a usar, pero está bien pasar como argumento;-).

 8
Author: Alex Martelli,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2010-07-10 01:26:57