¿Cuál es la mejor manera de esperar en múltiples variables de condición en C++11?


Primero un poco contexto : Estoy en el proceso de aprender sobre el subproceso en C++11 y para este propósito, estoy tratando de construir una pequeña clase actor, esencialmente (dejé el manejo de excepciones y cosas de propagación) así:

class actor {
    private: std::atomic<bool> stop;
    private: std::condition_variable interrupt;
    private: std::thread actor_thread;
    private: message_queue incoming_msgs;

    public: actor() 
    : stop(false), 
      actor_thread([&]{ run_actor(); })
    {}

    public: virtual ~actor() {
        // if the actor is destroyed, we must ensure the thread dies too
        stop = true;
        // to this end, we have to interrupt the actor thread which is most probably
        // waiting on the incoming_msgs queue:
        interrupt.notify_all();
        actor_thread.join();
    }

    private: virtual void run_actor() {
        try {
            while(!stop)
                // wait for new message and process it
                // but interrupt the waiting process if interrupt is signaled:
                process(incoming_msgs.wait_and_pop(interrupt));
        } 
        catch(interrupted_exception) {
            // ...
        }
    };

    private: virtual void process(const message&) = 0;
    // ...
};

Cada actor se ejecuta en su propio actor_thread, espera un nuevo mensaje entrante en incoming_msgs y when cuando llega un mensaje it lo procesa.

El actor_thread se crea junto con el actor y tiene que morir junto con él, por lo que necesito algún tipo de mecanismo de interrupción en el message_queue::wait_and_pop(std::condition_variable interrupt).

Esencialmente, requiero que wait_and_pop bloques hasta que a) un nuevo message llega o b) hasta que se dispare el interrupt, en cuyo caso ideally idealmente an se lanzará un interrupted_exception.

La llegada de un nuevo mensaje en el message_queue es actualmente modelada también por un std::condition_variable new_msg_notification:

// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);

    // How to interrupt the following, when interrupt fires??
    new_msg_notification.wait(lock,[&]{
        return !queue.empty();
    });
    auto msg(std::move(queue.front()));
    queue.pop();
    return msg;
}

Para acortar la historia, la pregunta es esta: ¿Cómo interrumpo la espera de un nuevo mensaje en new_msg_notification.wait(...) cuando se activa el interrupt (sin introducir un tiempo fuera)?

Alternativamente, la pregunta puede leerse como: ¿Cómo espero hasta que se señale cualquiera de dos std::condition_variable?

Un enfoque ingenuo parece ser no usar std::condition_variable en absoluto para la interrupción y en su lugar usar una bandera atómica std::atomic<bool> interrupted y luego esperar ocupado en new_msg_notification con un tiempo de espera muy pequeño hasta que haya llegado un nuevo mensaje o hasta true==interrupted. Sin embargo, me gustaría mucho evitar la espera ocupada.


EDITAR:

De la comentarios y la respuesta de pilcrow, parece que hay básicamente dos enfoques posibles.

  1. Enqueue un mensaje especial de "Terminación", propuesto por Alan, mukunda y pilcrow. Me decidí en contra de esta opción porque no tengo idea sobre el tamaño de la cola en el momento en que quiero que el actor termine. Puede muy bien ser (como es sobre todo el caso cuando quiero que algo termine rápidamente) que hay miles de mensajes a la izquierda para procesar en la cola y parece inaceptable esperar a que se procesen hasta que finalmente el mensaje de terminación reciba su turno.
  2. Implementa una versión personalizada de una variable de condición, que puede ser interrumpida por otro subproceso reenviando la notificación a la variable de condición en la que está esperando el primer subproceso. Opté por este enfoque.

Para aquellos de ustedes interesados, mi implementación es la siguiente. La variable de condición en mi caso es en realidad un semaphore (porque me gustan más y porque me gustó el ejercicio de hacerlo). Equipé este semáforo con un interrupt asociado que se puede obtener del semáforo vía semaphore::get_interrupt(). Si ahora un subproceso bloquea semaphore::wait(), otro subproceso tiene la posibilidad de llamar semaphore::interrupt::trigger() a la interrupción del semáforo, haciendo que el primer subproceso desbloquee y propague un interrupt_exception.

struct
interrupt_exception {};

class
semaphore {
    public: class interrupt;
    private: mutable std::mutex mutex;

    // must be declared after our mutex due to construction order!
    private: interrupt* informed_by;
    private: std::atomic<long> counter;
    private: std::condition_variable cond;

    public: 
    semaphore();

    public: 
    ~semaphore() throw();

    public: void 
    wait();

    public: interrupt&
    get_interrupt() const { return *informed_by; }

    public: void
    post() {
        std::lock_guard<std::mutex> lock(mutex);
        counter++;
        cond.notify_one(); // never throws
    }

    public: unsigned long
    load () const {
        return counter.load();
    }
};

class
semaphore::interrupt {
    private: semaphore *forward_posts_to;
    private: std::atomic<bool> triggered;

    public:
    interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
        assert(forward_posts_to);
        std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
        forward_posts_to->informed_by = this;
    }

    public: void
    trigger() {
        assert(forward_posts_to);
        std::lock_guard<std::mutex>(forward_posts_to->mutex);

        triggered = true;
        forward_posts_to->cond.notify_one(); // never throws
    }

    public: bool
    is_triggered () const throw() {
        return triggered.load();
    }

    public: void
    reset () throw() {
        return triggered.store(false);
    }
};

semaphore::semaphore()  : counter(0L), informed_by(new interrupt(this)) {}

// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw()  {
    delete informed_by;
}

void
semaphore::wait() {
    std::unique_lock<std::mutex> lock(mutex);
    if(0L==counter) {
        cond.wait(lock,[&]{
            if(informed_by->is_triggered())
                throw interrupt_exception();
            return counter>0;
        });
    }
    counter--;
}

Usando este semaphore, mi implementación de la cola de mensajes ahora se ve así (usando el semáforo en lugar del std::condition_variable podría deshacerme del std::mutex:

class
message_queue {    
    private: std::queue<message> queue;
    private: semaphore new_msg_notification;

    public: void
    push(message&& msg) {
        queue.push(std::move(msg));
        new_msg_notification.post();
    }

    public: const message
    wait_and_pop() {
        new_msg_notification.wait();
        auto msg(std::move(queue.front()));
        queue.pop();
        return msg;
    }

    public: semaphore::interrupt&
    get_interrupt() const { return new_msg_notification.get_interrupt(); }
};

Mi actor, ahora es capaz de interrumpir su hilo con muy baja latencia en su hilo. La implementación actualmente como esta:

class
actor {
    private: message_queue
    incoming_msgs;

    /// must be declared after incoming_msgs due to construction order!
    private: semaphore::interrupt&
    interrupt;

    private: std::thread
    my_thread;

    private: std::exception_ptr
    exception;

    public:
    actor()
    : interrupt(incoming_msgs.get_interrupt()), my_thread(
        [&]{
            try {
                run_actor();
            }
            catch(...) {
                exception = std::current_exception();
            }
        })
    {}

    private: virtual void
    run_actor() {
        while(!interrupt.is_triggered())
            process(incoming_msgs.wait_and_pop());
    };

    private: virtual void
    process(const message&) = 0;

    public: void
    notify(message&& msg_in) {
        incoming_msgs.push(std::forward<message>(msg_in));
    }

    public: virtual
    ~actor() throw (interrupt_exception) {
        interrupt.trigger();
        my_thread.join();
        if(exception)
            std::rethrow_exception(exception);
    }
};
Author: Cœur, 2014-12-07

3 answers

Usted pregunta,

¿Cuál es la mejor manera de esperar en múltiples variables de condición en C++11?

No puedes, y debes rediseñar. Un subproceso puede esperar solo en una variable de condición (y su mutex asociado) a la vez. En este sentido, las instalaciones de Windows para la sincronización son bastante más ricas que las de la familia de primitivas de sincronización "al estilo POSIX".

El enfoque típico con colas seguras para hilos es hacer un " all done!" mensaje, o para diseñar una cola" rompible "(o" capaz de apagar"). En este último caso, la variable de condición interna de la cola protege un predicado complejo: un elemento está disponible o la cola se ha roto.

En un comentario observas que

una notify_all() no tendrá efecto si no hay nadie esperando

Eso es cierto, pero probablemente no es relevante. wait()ing en una variable de condición también implica comprobar un predicado, y comprobar se antes de en realidad el bloqueo de una notificación. Por lo tanto, un hilo de trabajo ocupado procesando un elemento de la cola que "pierde" un notify_all() verá, la próxima vez que inspeccione la condición de la cola, que el predicado (un nuevo elemento está disponible, o, la cola está todo hecho) ha cambiado.

 13
Author: pilcrow,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-12-09 22:34:13

Recientemente resolví este problema con la ayuda de una sola variable de condición y una variable booleana separada para cada productor/trabajador. El predicado dentro de la función wait en consumer thread puede verificar estos indicadores y tomar la decisión de qué productor/trabajador ha satisfecho la condición.

 3
Author: Aditya Kumar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-04-01 00:48:14

Tal vez esto puede funcionar:

Deshazte de la interrupción.

 message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);
    {
        new_msg_notification.wait(lock,[&]{
            return !queue.empty() || stop;
        });

        if( !stop )
        {
            auto msg(std::move(queue.front()));
            queue.pop();
            return msg;
        }
        else
        {
            return NULL; //or some 'terminate' message
        }
}

En destructor, sustitúyase interrupt.notify_all() por new_msg_notification.notify_all()

 0
Author: Davidb,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-20 03:01:09