Cola de trabajos como tabla SQL con varios consumidores (PostgreSQL)


Tengo un problema típico productor-consumidor:

Múltiples aplicaciones de producción escriben solicitudes de trabajo en una tabla de trabajos en una base de datos PostgreSQL.

Las solicitudes de trabajo tienen un campo de estado que se inicia con la COLA en la creación.

Hay múltiples aplicaciones de consumo que son notificadas por una regla cuando un productor inserta un nuevo registro:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

Intentarán reservar un nuevo registro estableciendo su estado en RESERVADO. Por supuesto, solo en el consumidor debe éxito. Los demás consumidores no deberían poder reservar el mismo registro. En su lugar, deberían reservar otros registros con state=EN COLA.

Ejemplo: algunos productores agregaron los siguientes registros a la tabla jobrecord :

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

Ahora, dos consumidores A, B quiere procesarlos. Empiezan a correr al mismo tiempo. Uno debe reservar id 1, el otro debe reservar id 2, luego el primero que termine debe reservar id 3 y así sucesivamente..

En a mundo multiproceso puro, usaría un mutex para controlar el acceso a la cola de trabajos, pero los consumidores son procesos diferentes que pueden ejecutarse en diferentes máquinas. Solo acceden a la misma base de datos, por lo que toda la sincronización debe ocurrir a través de la base de datos.

He leído mucha documentación sobre el acceso concurrente y el bloqueo en PostgreSQL, por ejemplo, http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Seleccionar fila desbloqueada en Postgresql PostgreSQL y bloqueo

De estos temas, aprendí que la siguiente instrucción SQL debería hacer lo que necesito:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

Desafortunadamente, cuando corro esto en múltiples procesos de consumo, en aproximadamente el 50% de las veces, todavía reservan el mismo registro, tanto procesándolo como sobrescribiendo uno los cambios del otro.

¿Qué me estoy perdiendo? ¿Cómo tengo que escribir la instrucción SQL para que varios consumidores no reservamos el mismo registro?

Author: Community, 2011-06-28

7 answers

Lee mi post aquí:

Consistencia en postgresql con bloqueo y selección para actualización

Si utiliza transaction y LOCK TABLE no tendrá problemas.

 4
Author: jordani,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:26:15

También uso postgres para una cola FIFO. Originalmente usé ACCESS EXCLUSIVE, que produce resultados correctos en alta concurrencia, pero tiene el desafortunado efecto de ser mutuamente excluyentes con pg_dump, que adquiere un bloqueo de ACCESO COMPARTIDO durante su ejecución. Esto hace que mi función next() se bloquee durante mucho tiempo (la duración del pg_dump). Esto no era aceptable ya que somos una tienda 24x7 y a los clientes no les gustaba el tiempo muerto en la cola en medio de la noche.

I se supone que debe haber un bloqueo menos restrictivo que seguiría siendo seguro concurrente y no bloqueado mientras pg_dump se esté ejecutando. Mi búsqueda me llevó a este post.

Luego investigué un poco.

Los siguientes modos son suficientes para una función FIFO queue NEXT() que actualizará el estado de un trabajo de queue a ejecutando sin ningún error de concurrencia, y tampoco bloqueará contra pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Consulta:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Se ve el resultado como:

UPDATE 1
 job_id
--------
     98
(1 row)

Aquí hay un script de shell que prueba todos los diferentes modos de bloqueo con alta concurrencia (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

El código también está aquí si desea editar: https://gist.github.com/1083936

Estoy actualizando mi aplicación para usar el modo EXCLUSIVO ya que es el modo más restrictivo que a) es correcto y b) no entra en conflicto con pg_dump. Elegí la más restrictiva ya que parece la menos arriesgada en términos de cambiar la app de ACCESS EXCLUSIVE sin ser un experto en bloqueo postgres.

Me siento bastante cómodo con mi equipo de pruebas y con las ideas generales detrás de la respuesta. Espero que compartir esto ayude a resolver este problema para otros.

 32
Author: apinstein,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-07-15 03:17:43

No es necesario hacer un bloqueo de tabla completo para esto :\.

Un bloqueo de fila creado con for update funciona bien.

Véase https://gist.github.com/mackross/a49b72ad8d24f7cefc32 por el cambio que hice a la respuesta de Apinstein y verifiqué que todavía funciona.

El código final es

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;
 14
Author: mackross,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-05-19 01:36:56

¿Qué pasa con solo seleccionar?

SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;

Https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

 4
Author: Vladimir Filipchenko,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-01-08 09:28:48

Es posible que desee ver cómo lo hace queue_classic. https://github.com/ryandotsmith/queue_classic

El código es bastante corto y fácil de entender.

 2
Author: Joe Van Dyk,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-10-01 18:34:44

Echa un vistazo a PgQ en lugar de reinventar la rueda.

 -1
Author: Sean,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-06-28 18:17:14

Bien, aquí está la solución que está funcionando para mí, basada en el enlace de jordani. Como algunos de mis problemas estaban en la forma en que funciona Qt-SQL, he incluido el código Qt:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

Para comprobar, si varios consumidores procesan el mismo trabajo, agregué una regla y una tabla de registro:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

Sin la instrucción LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE;, la tabla log se llena ocasionalmente con entradas, donde un consumidor ha sobrescrito los valores de otro, pero usando la instrucción LOCK, la tabla log permanece vacía: -)

 -1
Author: code_talker,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2011-06-30 08:04:55