Cómo esperar a que termine un ThreadPoolExecutor


Mi pregunta: Cómo ejecutar un montón de objetos enhebrados en un ThreadPoolExecutor ¿y esperar a que terminen antes de seguir adelante?

Soy nuevo en ThreadPoolExecutor. Así que este código es una prueba para aprender cómo funciona. Ahora ni siquiera me llene el BlockingQueue con los objetos porque no entiendo cómo iniciar la cola sin llamar execute() con otro RunnableObject. De todos modos, ahora mismo solo llamo awaitTermination() pero creo que todavía me falta algo. Cualquier consejo sería genial! Gracias.

public void testThreadPoolExecutor() throws InterruptedException {
  int limit = 20;
  BlockingQueue q = new ArrayBlockingQueue(limit);
  ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
  for (int i = 0; i < limit; i++) {
    ex.execute(new RunnableObject(i + 1));
  }
  ex.awaitTermination(2, TimeUnit.SECONDS);
  System.out.println("finished");
}

La clase RunnableObject:

package playground;

public class RunnableObject implements Runnable {

  private final int id;

  public RunnableObject(int id) {
    this.id = id;
  }

  @Override
  public void run() {
    System.out.println("ID: " + id + " started");
    try {
      Thread.sleep(2354);
    } catch (InterruptedException ignore) {
    }
    System.out.println("ID: " + id + " ended");
  }
}
Author: kentcdodds, 2012-06-07

6 answers

Usted debe bucle en awaitTermination

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
  log.info("Awaiting completion of threads.");
}
 46
Author: OldCurmudgeon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-05-06 13:48:34

Su problema parece ser que no está llamando a shutdown después de haber enviado todos los trabajos a su grupo. Sin shutdown() su awaitTermination siempre devolverá false.

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

También puedes hacer algo como lo siguiente para esperar a que terminen todos tus trabajos:{[19]]}

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
   // this joins with the submitted job
   future.get();
}
...
// still need to shutdown at the end
ex.shutdown();

También, porque estás durmiendo durante 2354 milisegundos, pero solo esperando la terminación de todos los trabajos para 2 SECONDS, awaitTermination siempre regresará false.

Por último, suena como si estuvieras preocuparse por crear un nuevo ThreadPoolExecutor y en su lugar desea reutilizar el primero. No lo sientas. La sobrecarga de GC va a ser extremadamente mínima en comparación con cualquier código que escriba para detectar si los trabajos están terminados.


Para citar de los javadocs, ThreadPoolExecutor.shutdown():

Inicia un apagado ordenado en el que se ejecutan las tareas enviadas anteriormente, pero no se aceptarán tareas nuevas. La invocación no tiene ningún efecto adicional si ya está apagada.

En el ThreadPoolExecutor.awaitTermination(...) método, está a la espera de que el estado del ejecutor para ir a TERMINATED. Pero primero el estado debe ir a SHUTDOWN si se llama shutdown() o STOP si se llama shutdownNow().

 4
Author: Gray,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-06-07 15:35:25

No tiene nada que ver con el ejecutor en sí. Simplemente use el java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>) de la interfaz. Se bloqueará hasta que todos los Callables estén terminados.

Los ejecutores están destinados a ser de larga duración; más allá de la vida útil de un grupo de tareas. shutdown es para cuando la aplicación está terminada y la limpieza.

 3
Author: artbristol,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-06-07 15:13:15

Aquí hay una variante de la respuesta aceptada que maneja reintentos si / cuando se lanza una excepción interrumpida:

executor.shutdown();

boolean isWait = true;

while (isWait)
{
    try
    {             
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        {
            log.info("Awaiting completion of bulk callback threads.");
        }
    } catch (InterruptedException e) {
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    }
}
 2
Author: Justin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-02-08 06:38:55

Otro enfoque es usar CompletionService, muy útil si tiene que intentar cualquier resultado de tarea:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++){
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);
}

executor.shutdown(); //it cannot be queued more task

try {
    for (int t = 0; t < numTaskToStart ; t++) {
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    }
} catch (InterruptedException e) {
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
} catch (ExecutionException e) {
    // ... something to catch ...
}
 1
Author: fl4l,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-06-15 10:09:35

Prueba esto,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}

Líneas a añadir

ex.shutdown();
ex.awaitTermination(timeout, unit)
 -1
Author: Kumar Vivek Mitra,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-06-07 15:15:37