Cómo combinar la piscina.map con Array (memoria compartida) en Python multiprocesamiento?


Tengo una matriz muy grande (solo lectura) de datos que quiero que sean procesados por múltiples procesos en paralelo.

Me gusta la piscina.función de mapa y le gustaría usarla para calcular funciones en esos datos en paralelo.

Vi que se puede usar la clase Value o Array para usar datos de memoria compartida entre procesos. Pero cuando intento usar esto, obtengo un RuntimeError: 'Los objetos SynchronizedString solo deben compartirse entre procesos a través de la herencia cuando se usa el Pool.asignar función:

Aquí hay un ejemplo simplificado de lo que estoy tratando de hacer:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

¿Puede alguien decirme qué estoy haciendo mal aquí?

Así que lo que me gustaría hacer es pasar información sobre una matriz asignada de memoria compartida recién creada a los procesos después de que se hayan creado en el grupo de procesos.

Author: Jeroen Dirks, 2009-11-04

4 answers

Tratando de nuevo como acabo de ver la recompensa;)

Básicamente creo que el mensaje de error significa lo que dijo: las matrices de memoria compartida multiprocesamiento no se pueden pasar como argumentos (por decapado). No tiene sentido serializar los datos - el punto es que los datos son memoria compartida. Así que tienes que hacer que la matriz compartida sea global. Creo que es más ordenado ponerlo como el atributo de un módulo, como en mi primera respuesta, pero simplemente dejarlo como una variable global en su ejemplo también funciona bien. Asumir aborde su punto de no querer establecer los datos antes de la bifurcación, aquí hay un ejemplo modificado. Si desea tener más de una posible matriz compartida (y es por eso que desea pasar toShare como un argumento) podría hacer una lista global de matrices compartidas, y simplemente pasar el índice a count_it (que se convertiría en for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[EDITAR: Lo anterior no funciona en Windows debido a no usar fork. Sin embargo, lo siguiente funciona en Windows, aún usando Pool, así que creo que esto es lo más cercano a lo que quieres:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

No estoy seguro de por qué map no Pickea la matriz, pero Process y Pool lo harán - creo que tal vez se ha transferido en el punto de inicialización del subproceso en windows. Sin embargo, tenga en cuenta que los datos todavía se establecen después de la bifurcación.

 37
Author: robince,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2009-11-12 18:19:03

El problema que veo es que Pool no admite el decapado de datos compartidos a través de su lista de argumentos. Eso es lo que el mensaje de error quiere decir con "los objetos solo deben ser compartidos entre procesos a través de la herencia". Los datos compartidos deben ser heredados, es decir, globales si desea compartirlos usando la clase Pool.

Si necesita pasarlos explícitamente, es posible que tenga que usar multiprocesamiento.Proceso. Aquí está su ejemplo reelaborado:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Salida: ('s', 9) ('a', 2) ('b', 3) ("d", 12)

El orden de los elementos de la cola puede variar.

Para hacer esto más genérico y similar a Pool, puede crear un número fijo de N Procesos, dividir la lista de claves en N piezas, y luego usar una función wrapper como destino del Proceso, que llamará a count_it por cada clave en la lista que se pase, como:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
 4
Author: jwilson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2009-11-10 02:08:28

Si los datos solo se leen, simplemente hágalo una variable en un módulo antes de la bifurcación del Pool. Entonces todos los procesos hijos deberían poder acceder a él, y no se copiará siempre que no escribas en él.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Si desea intentar usar Array, puede intentarlo con el argumento de la palabra clave lock=False (es true por defecto).

 2
Author: robince,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2009-11-04 20:37:31

El módulo multiprocessing.sharedctypes proporciona funciones para asignar objetos ctypes de memoria compartida que pueden ser heredados por procesos hijos.

Así que su uso de sharedctypes es incorrecto. ¿Desea heredar este array del proceso padre o prefiere pasarlo explícitamente? En el primer caso, debe crear una variable global como sugieren otras respuestas. Pero no necesita usar sharedctypes para pasarlo explícitamente, simplemente pase original testData.

POR cierto, su el uso de Pool.map() es incorrecto. Tiene la misma interfaz que la función builtin map() (¿lo ensuciaste con starmap()?). A continuación se muestra un ejemplo de trabajo con, pasando el array explícitamente:

from multiprocessing import Pool

def count_it( (arr, key) ):
    count = 0
    for c in arr:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    pool = Pool()
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
 -1
Author: Denis Otkidach,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2009-11-12 15:26:50