Cola segura para subprocesos de C++11


Un proyecto en el que estoy trabajando utiliza varios subprocesos para trabajar en una colección de archivos. Cada subproceso puede agregar archivos a la lista de archivos a procesar, así que armé (lo que pensé que era) una cola segura para subprocesos. Las porciones relevantes siguen:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

Sin embargo, de vez en cuando estoy segfaulting dentro del bloque if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }, y la inspección en gdb indica que los segfaults están ocurriendo porque la cola está vacía. ¿Cómo es esto posible? Tenía entendido que wait_for solo devuelve cv_status::no_timeout cuando ha sido notificado, y esto solo debería suceder después de que FileQueue::enqueue haya enviado un nuevo elemento a la cola.

Author: Matt Kline, 2013-03-07

7 answers

De acuerdo con el estándar condition_variables se les permite despertar espuriosamente, incluso si el evento no ha ocurrido. En caso de un despertar espurio volverá cv_status::no_timeout (ya que se despertó en lugar de timing out), a pesar de que no ha sido notificado. La solución correcta para esto es, por supuesto, comprobar si el despertar era realmente legítimo antes de proceder.

Los detalles se especifican en la norma §30.5.1 [hilo.condición.condvar]:

- La función se desbloqueará cuando se indique mediante una llamada to notify_one (), una llamada a notify_all (), vencimiento del tiempo de espera absoluto (30.2.4) especificado por abs_time, o spuriously.

...

Devuelve: cv_status::timeout si el tiempo de espera absoluto (30.2.4) especificado por abs_time expiró, other-ise cv_status::no_timeout.

 25
Author: Grizzly,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-03-07 18:01:33

Solo mirándolo, cuando compruebas una variable de condición, lo mejor es usar un bucle while (de modo que si se despierta y aún no es inválido, lo compruebes de nuevo). Acabo de escribir una plantilla para una cola asíncrona, espero que esto ayude.

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif
 33
Author: ChewOnThis_Trident,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-24 10:09:47

Esto es probablemente cómo debe hacerlo:

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}
 11
Author: ronag,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-10-30 11:35:57

Agregando a la respuesta aceptada, yo diría que implementar una cola correcta de multi productores / multi consumidores es difícil (más fácil desde C++11, sin embargo)

Te sugiero que pruebes la (muy buena) lock free boost library, la estructura "queue" hará lo que quieras, con garantías de wait-free/lock-free y sin la necesidad de un compilador de C++11.

Estoy agregando esta respuesta ahora porque la biblioteca libre de bloqueos es bastante nueva para impulsar (desde 1.53 I creer)

 8
Author: quantdev,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-05-20 21:09:36

Reescribiría su función dequeue como:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

Es más corto y no tiene código duplicado como el tuyo. Solo problema que puede esperar más tiempo que el tiempo de espera. Para evitar que tenga que recordar la hora de inicio antes del bucle, verifique el tiempo de espera y ajuste el tiempo de espera en consecuencia. O especifique tiempo absoluto en la condición de espera.

 5
Author: Slava,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2013-03-07 18:10:11

También hay una solución simplista para este caso, aún no lo intenté, pero creo que es una buena solución. https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

 1
Author: ransh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-02-17 19:40:57

Puede que te guste lfqueue, https://github.com/Taymindis/lfqueue . Es bloquear la cola concurrente libre. Actualmente lo estoy usando para consumir la cola de múltiples llamadas entrantes y funciona como un encanto.

 0
Author: woon minika,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-07-17 23:48:58