Cola de bloqueo y consumidor multiproceso, cómo saber cuándo detenerse


Tengo un único productor de subprocesos que crea algunos objetos de tarea que luego se agregan a un ArrayBlockingQueue (que es de tamaño fijo).

También comienzo un consumidor multihilo. Esto se construye como un grupo de subprocesos fijo (Executors.newFixedThreadPool(threadCount);). A continuación, presento algunas iniciativas de ConsumerWorker a este ThreadPool, cada ConsumerWorker teniendo una referencia a la instancia ArrayBlockingQueue mencionada anteriormente.

Cada trabajador hará un take() en la cola y se ocupará de la tarea.

Mi problema es, ¿qué es la mejor manera de que un trabajador sepa cuándo no habrá más trabajo por hacer. En otras palabras, cómo le digo a los Trabajadores que el productor ha terminado de agregar a la cola, y a partir de este punto, cada trabajador debe detenerse cuando vea que la Cola está vacía.

Lo que tengo ahora es una configuración donde mi Productor se inicializa con una devolución de llamada que se activa cuando termina su trabajo (de agregar cosas a la cola). También tengo una lista de todos los ConsumerWorkers que he creado y presentado al ThreadPool. Cuando la devolución de llamada del Productor me dice que el productor ha terminado, puedo decirle esto a cada uno de los trabajadores. En este punto, simplemente deben seguir comprobando si la cola no está vacía, y cuando se vuelve vacía deben detenerse, lo que me permite apagar con gracia el grupo de subprocesos ExecutorService. Es algo como esto

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

El problema aquí es que tengo una condición de carrera obvia donde a veces el productor terminará, lo señalará, y el ConsumerWorkers se detendrá ANTES de consumir todo en la cola.

Mi pregunta es ¿cuál es la mejor manera de sincronizar esto para que todo funcione bien? ¿Debo sincronizar toda la parte donde comprueba si el productor está ejecutando plus si la cola está vacía plus tomar algo de la cola en un bloque (en el objeto de la cola)? ¿Debo sincronizar la actualización del booleano isRunning en la instancia de ConsumerWorker? ¿Alguna otra sugerencia?

ACTUALIZAR, AQUÍ ESTÁ EL TRABAJO IMPLEMENTACIÓN QUE HE TERMINADO USANDO:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

Esto fue inspirado en gran medida por la respuesta de JohnVint a continuación, con solo algunas modificaciones menores.

=== Actualización debido al comentario de @vendhan.

Gracias por su observación. Tienes razón, el primer fragmento de código en esta pregunta tiene (entre otras cuestiones) el que el while(isRunning || !inputQueue.isEmpty()) no tiene realmente sentido.

En mi implementación final real de esto, hago algo que está más cerca de su sugerencia de reemplazar | / / "(o) por "& & " (y), en el sentido de que cada trabajador (consumidor) ahora solo comprueba si el elemento que tiene de la lista es una píldora venenosa, y si es así se detiene (así que teóricamente podemos decir que el trabajador tiene que estar corriendo y la cola no debe estar vacía).

Author: Shivan Dragon, 2012-01-23

6 answers

Debe continuar a take() desde la cola. Puede usar una píldora venenosa para decirle al trabajador que deje de hacerlo. Por ejemplo:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
 80
Author: John Vint,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-01-23 16:32:35

Enviaría a los trabajadores un paquete de trabajo especial para señalar que deben cerrar:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

Para detener a los trabajadores, simplemente agregue ConsumerWorker.DONE a la cola.

 14
Author: NPE,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-01-23 16:15:21

En su bloque de código donde intenta recuperar un elemento de la cola , use poll(time,unit) en lugar del take().

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

Al especificar los valores apropiados de tiempo de espera, se asegura de que los hilos no seguirán bloqueando en caso de que haya una secuencia desafortunada de

  1. isRunning es verdadero
  2. La cola se vacía, por lo que los hilos entran en espera bloqueada ( si se usa take()
  3. isRunning se establece en false
 1
Author: Bhaskar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-01-23 16:53:26

Hay una serie de estrategias que podría utilizar, pero una simple es tener una subclase de tarea que señala el final del trabajo. El productor no envía esta señal directamente. En su lugar, pone en cola una instancia de esta subclase task. Cuando uno de sus consumidores realiza esta tarea y la ejecuta, eso hace que se envíe la señal.

 0
Author: Kaelin Colclasure,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-01-23 16:11:10

Tuve que usar un productor multihilo y un consumidor multihilo. Terminé con un esquema Scheduler -- N Producers -- M Consumers, cada dos se comunican a través de una cola (dos colas en total). El Planificador llena la primera cola con solicitudes para producir datos, y luego la llena con N "píldoras venenosas". Hay un contador de productores activos (atomic int), y el último productor que recibe la última píldora venenosa envía M píldoras venenosas a la cola del consumidor.

 0
Author: 18446744073709551615,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-02-13 09:32:18

No podemos hacerlo usando un CountDownLatch, donde el tamaño es el número de registros en el productor. Y cada consumidor countDown después de procesar un registro. Y cruza el método awaits() cuando todas las tareas terminaron. Entonces detén a todos tus consumidores. Como todos los registros se procesan.

 0
Author: Arpan Das,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-11-15 06:40:14