¿Cómo conseguir que el ThreadPoolExecutor aumente los hilos al máximo antes de hacer cola?


He estado frustrado durante algún tiempo con el comportamiento predeterminado de ThreadPoolExecutor que respalda los conjuntos de subprocesos ExecutorService que muchos de nosotros usamos. Para citar de los Javadocs:

Si hay más subprocesos que corePoolSize pero menos que maximumPoolSize en ejecución, se creará un nuevo subproceso solo si la cola está llena.

Lo que esto significa es que si define un grupo de subprocesos con el siguiente código, nunca iniciará el 2do subproceso porque el {[3] } es ilimitado.

ExecutorService threadPool =
    new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Solo si tienes una cola acotada y la cola está llena hay cualquier hilo por encima del número de núcleo iniciado. Sospecho que un gran número de programadores júnior Java multithreaded no son conscientes de este comportamiento de la ThreadPoolExecutor.

Ahora tengo un caso de uso específico donde esto no es óptimo. Estoy buscando formas, sin escribir mi propia clase de TPE, para evitarlo.

Mis requisitos son para un servicio web que está haciendo devoluciones de llamada a una tercera parte posiblemente poco fiable.

  • No quiero realizar la devolución de llamada de forma sincrónica con la solicitud web, por lo que quiero usar un grupo de subprocesos.
  • Normalmente obtengo un par de estos por minuto, así que no quiero tener un newFixedThreadPool(...) con un gran número de hilos que en su mayoría están inactivos.
  • De vez en cuando recibo una ráfaga de este tráfico y quiero escalar el número de subprocesos a algún valor máximo (digamos 50).
  • Necesito hacer un mejor intento hacer todo callbacks así que quiero poner en cola cualquier adicional por encima de 50. No quiero abrumar al resto de mi servidor web usando un newCachedThreadPool().

¿Cómo puedo evitar esta limitación en ThreadPoolExecutor donde la cola necesita estar limitada y llena antes de que se inicien más subprocesos? ¿Cómo puedo hacer que inicie más subprocesos antes de las tareas de cola?

Editar:

@Flavio hace un buen punto sobre el uso de la ThreadPoolExecutor.allowCoreThreadTimeOut(true) para tener el tiempo de espera de los hilos de núcleo y salida. Lo consideré, pero todavía quería la función core-threads. No quería que el número de hilos en el grupo cayera por debajo del tamaño del núcleo si era posible.

Author: Gray, 2013-10-23

8 answers

¿Cómo puedo evitar esta limitación en ThreadPoolExecutor donde la cola necesita estar limitada y llena antes de que se inicien más subprocesos?

Creo que finalmente he encontrado una solución algo elegante (tal vez un poco hacky) a esta limitación con ThreadPoolExecutor. Implica extender LinkedBlockingQueue para que devuelva false para queue.offer(...) cuando ya hay algunas tareas en cola. Si los subprocesos actuales no están al día con las tareas en cola, el TPE agregará subprocesos adicionales. Si el pool ya está en el número máximo de subprocesos, entonces se llamará a RejectedExecutionHandler. Es el manejador que luego hace el put(...) en la cola.

Ciertamente es extraño escribir una cola donde offer(...) puede devolver false y put() nunca bloquea, así que esa es la parte de hackeo. Pero esto funciona bien con el uso de TPE de la cola, por lo que no veo ningún problema con hacer esto.

Aquí está el código:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        /*
         * Offer it to the queue if there is 0 items already queued, else
         * return false so the TPE will add another thread. If we return false
         * and max threads have been reached then the RejectedExecutionHandler
         * will be called which will do the put into the queue.
         */
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            /*
             * This does the actual put into the queue. Once the max threads
             * have been reached, the tasks will then queue up.
             */
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Con este mecanismo, cuando envío tareas a la cola, el {[2] }:

  1. Escala el número de hilos hasta el tamaño del núcleo inicialmente (aquí 1).
  2. Ofrécelo a la cola. Si la cola está vacía, se pondrá en cola para ser manejada por los subprocesos existentes.
  3. Si la cola ya tiene 1 o más elementos, el offer(...) devolverá false.
  4. Si se devuelve false, aumente el número de subprocesos en el grupo hasta que alcancen el número máximo (aquí 50).
  5. Si en el máximo entonces llama a la RejectedExecutionHandler
  6. El RejectedExecutionHandler luego pone la tarea en la cola para ser procesado por el primer hilo disponible en orden FIFO.

Aunque en el código de mi ejemplo anterior, la cola no está acotada, también podría definirla como una cola acotada. Por ejemplo, si agrega una capacidad de 1000 a LinkedBlockingQueue, entonces:

  1. escala los hilos hasta max
  2. luego en cola hasta que esté lleno con 1000 tareas
  3. luego bloquea al llamante hasta que haya espacio disponible para la cola.

Además, si realmente necesita usar offer(...) en el RejectedExecutionHandler entonces podría usar el método offer(E, long, TimeUnit) en su lugar con Long.MAX_VALUE como tiempo de espera.

Editar:

He ajustado mi offer(...) método override según los comentarios de @Ralf. Esto solo escalará el número de hilos en el grupo si no se mantienen al día con la carga.

Editar:

Otro ajuste a esta respuesta podría ser realmente preguntar al TPE si hay hilos inactivos y solo encolar el elemento si lo hay. Tendrías que hacer un true class para esto y agregue un método ourQueue.setThreadPoolExecutor(tpe); en él.

Entonces su offer(...) método podría ser algo así como:

  1. Compruebe si el tpe.getPoolSize() == tpe.getMaximumPoolSize() en cuyo caso solo llame a super.offer(...).
  2. Else if tpe.getPoolSize() > tpe.getActiveCount() entonces llama a super.offer(...) ya que parece que hay hilos inactivos.
  3. De lo contrario devuelve false para bifurcar otro hilo.

Tal vez esto:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Tenga en cuenta que los métodos get en TPE son caros ya que acceden a los campos volatile o (en el caso de getActiveCount()) bloquean TPE y caminar la lista de hilos. Además, hay condiciones de carrera aquí que pueden causar que una tarea sea encolada incorrectamente u otro hilo bifurcado cuando había un hilo inactivo.

 37
Author: Gray,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-08-21 13:32:19

Establezca el tamaño del núcleo y el tamaño máximo en el mismo valor, y permita que los hilos del núcleo se eliminen del grupo con allowCoreThreadTimeOut(true).

 23
Author: Flavio,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-10-22 21:22:34

Ya tengo otras dos respuestas a esta pregunta, pero sospecho que esta es la mejor.

Se basa en la técnica de la respuesta actualmente aceptada , a saber:

  1. Invalida el método offer() de la cola para (a veces) devolver false,
  2. que hace que el ThreadPoolExecutor genere un nuevo hilo o rechace la tarea, y
  3. establece el RejectedExecutionHandlera en realidad encola la tarea en caso de rechazo.

El problema es cuando offer() debería devuelve false. La respuesta actualmente aceptada devuelve false cuando la cola tiene un par de tareas en ella, pero como he señalado en mi comentario allí, esto causa efectos indeseables. Alternativamente, si siempre devuelves false, seguirás generando nuevos hilos incluso cuando tengas hilos esperando en la cola.

La solución es usar Java 7 LinkedTransferQueue y tienen offer() llamar tryTransfer(). Cuando hay un subproceso de consumidor en espera, la tarea simplemente se pasará a ese subproceso. De lo contrario, offer() devolverá false y el ThreadPoolExecutor generará un nuevo hilo.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
 16
Author: Robert Tupelo-Schneck,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-12-05 14:48:24

Nota: ahora prefiero y recomiendo mi otra respuesta.

Aquí hay una versión que me parece mucho más sencilla: Aumentar el tamaño de la base de datos (hasta el límite de maximumPoolSize) cada vez que se ejecuta una nueva tarea, luego disminuir el tamaño de la base de datos (hasta el límite del "tamaño del núcleo" especificado por el usuario) cada vez que se completa una tarea.

Para decirlo de otra manera, lleve un registro del número de tareas en ejecución o en cola, y asegúrese de que el corePoolSize sea igual al número de tareas siempre y cuando esté entre el usuario especificado "core pool size" y el maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Tal como está escrito, la clase no admite cambiar el corePoolSize o maximumPoolSize especificado por el usuario después de la construcción, y no admite la manipulación de la cola de trabajo directamente o a través de remove() o purge().

 7
Author: Robert Tupelo-Schneck,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-12-05 14:51:59

Tenemos una subclase de ThreadPoolExecutor que toma un creationThreshold adicional y anula execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

Tal vez eso también ayude, pero el tuyo se ve más artístico, por supuesto {

 5
Author: Ralf H,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-10-23 10:15:50

La respuesta recomendada resuelve solo uno (1) del problema con el grupo de subprocesos JDK:

  1. Los grupos de subprocesos JDK están sesgados hacia las colas. Así que en lugar de generar un nuevo hilo, pondrán en cola la tarea. Solo si la cola alcanza su límite, el grupo de subprocesos generará un nuevo subproceso.

  2. El retiro del hilo no ocurre cuando la carga se aligera. Por ejemplo, si tenemos una ráfaga de trabajos que golpean la piscina que hace que la piscina vaya a max, seguido de una carga ligera de max 2 tareas a la vez, el grupo utilizará todos los subprocesos para mantener la carga ligera evitando el retiro de subprocesos. (solo se necesitarían 2 hilos...)

Insatisfecho con el comportamiento anterior, seguí adelante e implementé un pool para superar las deficiencias anteriores.

Para resolver 2) El uso de la programación Lifo resuelve el problema. Esta idea fue presentada por Ben Maurer en la conferencia ACM applicative 2015: Systems @ Facebook scale

Así que una nueva implementación fue nacido:

LifoThreadPoolExecutorSQP

Hasta ahora esta implementación mejora el rendimiento de ejecución asincrónica para ZEL.

La implementación es spin capaz de reducir la sobrecarga del cambio de contexto, produciendo un rendimiento superior para ciertos casos de uso.

Espero que ayude...

PD: JDK Fork Join Pool implementa ExecutorService y funciona como un grupo de subprocesos "normal", La implementación es eficiente, utiliza la programación de subprocesos LIFO, sin embargo, no hay control sobre el tamaño de la cola interna, tiempo de espera de retiro...

 3
Author: user2179737,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-15 20:43:30

Nota: ahora prefiero y recomiendo mi otra respuesta.

Tengo otra propuesta, siguiendo la idea original de cambiar la cola para devolver false. En esta, todas las tareas pueden entrar en la cola, pero cada vez que una tarea se encuela después de execute(), la seguimos con una tarea no-op centinela que la cola rechaza, causando que aparezca un nuevo hilo, que ejecutará la no-op inmediatamente seguido por algo de la cola.

Porque los hilos de trabajo pueden estar sondeando el LinkedBlockingQueue para una tarea nueva, es posible que una tarea se ponga en cola incluso cuando hay un hilo disponible. Para evitar generar nuevos subprocesos incluso cuando hay subprocesos disponibles, necesitamos realizar un seguimiento de cuántos subprocesos están esperando nuevas tareas en la cola, y solo generar un nuevo subproceso cuando hay más tareas en la cola que subprocesos en espera.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});
 1
Author: Robert Tupelo-Schneck,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-12-05 14:51:04

La mejor solución que se me ocurre es extender.

ThreadPoolExecutor ofrece algunos métodos de gancho: beforeExecute y afterExecute. En su extensión puede mantener una cola limitada para alimentar las tareas y una segunda cola ilimitada para manejar el desbordamiento. Cuando alguien llama a submit, puede intentar colocar la solicitud en la cola acotada. Si te encuentras con una excepción, simplemente pegas la tarea en tu cola de desbordamiento. A continuación, podría utilizar el gancho afterExecute para ver si hay algo en el cola de desbordamiento después de terminar una tarea. De esta manera, el ejecutor se encargará de las cosas en su cola limitada primero, y automáticamente extraerá de esta cola ilimitada según lo permita el tiempo.

Parece más trabajo que su solución, pero al menos no implica dar a las colas comportamientos inesperados. También imagino que hay una mejor manera de verificar el estado de la cola y los hilos en lugar de depender de excepciones, que son bastante lentas de lanzar.

 0
Author: bstempi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-10-22 21:52:53