Java: ExecutorService que bloquea el envío después de un cierto tamaño de cola


Estoy tratando de codificar una solución en la que un solo subproceso produce tareas intensivas de E/S que se pueden realizar en paralelo. Cada tarea tiene datos significativos en la memoria. Así que quiero ser capaz de limitar el número de tareas que están pendientes en un momento.

Si creo ThreadPoolExecutor así:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Entonces el executor.submit(callable) lanza RejectedExecutionException cuando la cola se llena y todos los hilos ya están ocupados.

¿Qué puedo hacer para hacer executor.submit(callable) bloque cuando la cola está llena y todos los hilos ¿estás ocupado?

EDITAR : He intentado esto :

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Y de alguna manera logra el efecto que quiero lograr pero de una manera poco elegante (básicamente los hilos rechazados se ejecutan en el hilo de llamada, por lo que esto bloquea el hilo de llamada de enviar más).

EDITAR: (5 años después de hacer la pregunta)

Para cualquiera que lea esta pregunta y sus respuestas, por favor no tome la respuesta aceptada como una solución correcta. Por favor, lea todo respuestas y comentarios.

Author: rogerdpack, 2010-12-23

7 answers

He hecho lo mismo. El truco es crear un BlockingQueue donde el método offer () es realmente un put (). (puede utilizar cualquier impl bloque base que desee).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Tenga en cuenta que esto solo funciona para el grupo de subprocesos donde corePoolSize==maxPoolSize así que tenga cuidado allí (ver comentarios).

 52
Author: jtahlborn,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-07-27 02:57:33

Así es como resolví esto de mi parte:

(nota: esta solución bloquea el subproceso que envía el Invocable, por lo que evita que se lance RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
 14
Author: cvacca,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-06-26 00:51:16

La respuesta actualmente aceptada tiene un problema potencialmente significativo: cambia el comportamiento de ThreadPoolExecutor.ejecute de tal manera que si tiene un corePoolSize < maxPoolSize, la lógica ThreadPoolExecutor nunca agregará trabajadores adicionales más allá del núcleo.

De ThreadPoolExecutor.ejecutar (Ejecutable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

Específicamente, ese último bloque 'else' nunca será golpeado.

Una mejor alternativa es hacer algo similar a lo que OP ya está haciendo-use un RechacedExecutionHandler para hacer lo mismo put lógica:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

Hay algunas cosas a tener en cuenta con este enfoque, como se señala en los comentarios (refiriéndose a esta respuesta):

  1. Si corePoolSize==0, entonces hay una condición de carrera donde todos los subprocesos en el grupo pueden morir antes de que la tarea sea visible
  2. El uso de una implementación que envuelva las tareas de cola (no aplicable a ThreadPoolExecutor) dará lugar a problemas a menos que el controlador también lo envuelva de la misma manera manera.

Teniendo en cuenta esos puntos clave, esta solución funcionará para la mayoría de los ejecutores típicos de ThreadPool, y manejará correctamente el caso en el que corePoolSize < maxPoolSize.

 9
Author: Krease,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:18:24

Creo que es tan simple como usar un ArrayBlockingQueue en lugar de LinkedBlockingQueue.

Ignórame... eso está totalmente mal. ThreadPoolExecutor llama Queue#offer no put que tendría el efecto que necesita.

Se podría extender ThreadPoolExecutor y proporcionar una implementación de execute(Runnable) que llama put en lugar de offer.

Me temo que esa no parece una respuesta completamente satisfactoria.

 2
Author: Gareth Davis,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2010-12-23 20:08:31

Tuve el problema similar y lo implementé usando beforeExecute/afterExecute hooks de ThreadPoolExecutor:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

Esto debería ser suficiente para ti. Por cierto, la implementación original se basó en el tamaño de la tarea porque una tarea podría ser más grande 100 veces que otra y enviar dos tareas enormes estaba matando la caja, pero ejecutar una grande y muchas pequeñas estaba bien. Si sus tareas intensivas de E/S son aproximadamente del mismo tamaño, podría usar esta clase, de lo contrario, hágamelo saber y publicaré según el tamaño aplicación.

P.d. Usted querría comprobar ThreadPoolExecutor javadoc. Es una guía de usuario muy agradable de Doug Lea sobre cómo se podría personalizar fácilmente.

 2
Author: Petro Semeniuk,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2010-12-24 02:25:20

Sé que esta es una vieja pregunta, pero tenía un problema similar que la creación de nuevas tareas era muy rápido y si había demasiados un OutOfMemoryError ocurrir porque la tarea existente no se completaron lo suficientemente rápido.

En mi caso, Callables se envían y necesito el resultado, por lo tanto, necesito almacenar todos los Futures devueltos por executor.submit(). Mi solución fue poner el Futures en un BlockingQueue con un tamaño máximo. Una vez que la cola está llena, no se generan más tareas hasta que se completen algunas (elementos eliminados de cola). En pseudo-código:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(compoundFuture);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();

    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future compoundFuture = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}
 2
Author: beginner_,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-07-22 06:38:47

He implementado una solución siguiendo el patrón decorador y utilizando un semáforo para controlar el número de tareas ejecutadas. Puedes usarlo con cualquier Executor y:

  • Especificar el máximo de tareas en curso
  • Especifique el tiempo máximo de espera para un permiso de ejecución de tareas (si el tiempo de espera pasa y no se adquiere ningún permiso, se lanza un RejectedExecutionException)
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }

        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }

        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }

    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }

    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }
}
 1
Author: Grzegorz Lehmann,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-19 21:53:21