Estructura de datos de un solo productor y un solo consumidor con doble búfer en C++


Tengo una aplicación en work work donde tengo que moverme entre dos subprocesos en tiempo real que están programados a diferentes frecuencias. (La programación real está fuera de mi control.) La aplicación es difícil en tiempo real-ish (uno de los hilos tiene que conducir una interfaz de hardware), por lo que la transferencia de datos entre los hilos debe ser libre de bloqueo y sin espera en la medida de lo posible.

Es importante tener en cuenta que solo se necesita transferir un bloque de datos: porque los dos hilos se ejecutan en diferentes tasas, habrá veces cuando dos iteraciones del subproceso más rápido se completan entre dos wakeups del subproceso más lento; en este caso, está bien sobrescribir los datos en el búfer de escritura para que el subproceso más lento obtenga solo los últimos datos.

En otras palabras, en lugar de una cola, una solución de doble búfer es suficiente. Los dos búferes se asignan durante la inicialización, y el lector y los hilos de escritura pueden llamar a los métodos de la clase para obtener punteros a uno de estos búfer.

Código C++:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() {
        m_write_busy = false;
        m_read_idx = m_write_idx = 0;
    }

    ~ProducerConsumerDoubleBuffer() { }

    // The writer thread using this class must call
    // start_writing() at the start of its iteration
    // before doing anything else to get the pointer
    // to the current write buffer.
    T * start_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = true;
        m_write_idx = 1 - m_read_idx;

        return &m_buf[m_write_idx];
    }
    // The writer thread must call end_writing()
    // as the last thing it does
    // to release the write busy flag.
    void end_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = false;
    }

    // The reader thread must call start_reading()
    // at the start of its iteration to get the pointer
    // to the current read buffer.
    // If the write thread is not active at this time,
    // the read buffer pointer will be set to the 
    // (previous) write buffer - so the reader gets the latest data.
    // If the write buffer is busy, the read pointer is not changed.
    // In this case the read buffer may contain stale data,
    // it is up to the user to deal with this case.
    T * start_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (!m_write_busy) {
            m_read_idx = m_write_idx;
        }

        return &m_buf[m_read_idx];
    }
    // The reader thread must call end_reading()
    // at the end of its iteration.
    void end_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_read_idx = m_write_idx;
    }

private:
    T m_buf[2];
    bool m_write_busy;
    unsigned int m_read_idx, m_write_idx;
    std::mutex m_mutex;
};

Para evitar datos obsoletos en el hilo del lector, la estructura de la carga útil es versionada. Para facilitar la transferencia de datos bidireccional entre los hilos, se utilizan dos instancias de la monstruosidad anterior, en direcciones opuestas.

Preguntas:

  • ¿Es este esquema threadsafe? Si está roto, ¿dónde?
  • se Puede hacer sin el mutex? Tal vez solo con barreras de memoria o instrucciones CAS?
  • Se puede hacer mejor?
Author: user3638506, 2014-05-15

3 answers

Problema muy interesante! Mucho más complicado de lo que pensé :-) Me gustan las soluciones sin bloqueo, así que he tratado de elaborar una a continuación.

Hay muchas maneras de pensar acerca de este sistema. Puedes modelar como un buffer/cola circular de tamaño fijo (con dos entradas), pero luego se pierde la capacidad de actualizar el siguiente valor disponible para el consumo, ya que no sabes si el consumidor ha comenzado a leer el más reciente publicado valor o está todavía (potencialmente) leyendo el anterior. Tan estado extra es necesario más allá de la de un búfer de anillo estándar con el fin de alcanzar una más óptima solución.

Primero tenga en cuenta que siempre hay una celda en la que el productor puede escribir de forma segura en cualquier momento dado; si una celda está siendo leída por el consumidor, el se puede escribir a otros. Vamos a llamar a la celda que se puede escribir con seguridad a la celda" activa " (la celda de la que se puede leer potencialmente es cualquier celda que no sea el activo). La celda activa solo se puede cambiar si el otra célula no es actualmente se está leyendo de.

A diferencia de la celda activa, en la que siempre se puede escribir, la celda no activa puede solo se puede leer de si contiene un valor; una vez que ese valor se consume, desaparece. (Esto significa que livelock se evita en el caso de un productor agresivo; en algunos punto, el consumidor habrá vaciado una célula y dejará de tocar las células. Una vez eso sucede, el productor definitivamente puede publicar un valor, mientras que antes de ese punto, solo puede publicar un valor (cambiar la celda activa) si el consumidor no está en medio de una lectura.)

Si es un valor que está listo para ser consumido, solo el consumidor puede cambiar eso fact (para la célula inactiva, de todos modos); las producciones posteriores pueden cambiar qué célula está activo y el valor publicado, pero un valor siempre estará listo para ser leído hasta se consume.

Una vez que el productor haya terminado de escribir en la celda activa, puede "publicar" este valor cambiar qué celda es la activa uno (intercambiar el índice), siempre que el consumidor no en medio de la lectura de la otra celda. Si el consumidor está en el medio de al leer la otra celda, el intercambio no puede ocurrir, pero en ese caso el consumidor puede intercambiar después de es hecho de leer el valor, siempre que el productor no está en el medio de un escribe (y si lo es, el productor cambiará una vez que esté hecho). De hecho, en general, el consumidor siempre puede intercambiar después de haber terminado de leer (si es el único acceso al sistema) porque los swaps espurios por parte del consumidor son benignos: si hay algo en la otra celda, entonces el intercambio hará que se lea a continuación, y si no lo hay, el intercambio no afecta nada.

Por lo tanto, necesitamos una variable compartida para rastrear lo que es la celda activa, y también necesitamos una tanto para el productor como para el consumidor para indicar si están en el medio de un operación. Podemos almacenar estas tres piezas de estado en una variable atómica en orden para poder afectarlos a todos a la vez (atómicamente). También necesitamos una manera para que el consumidor compruebe si hay algo en la celda no activa en primer lugar, y para que ambos subprocesos modifiquen ese estado según proceda. Probé algunos otros enfoques, pero al final el más fácil fue solo incluir esta información en la otra variable atómica también. Esto hace que las cosas sean mucho más fácil de razonar, ya que todos los cambios de estado en el sistema son atómicos de esta manera.

Se me ocurrió una implementación sin espera (sin bloqueo, y todo operaciones completadas en un número limitado de instrucciones).

¡Hora código!

#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it's on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there's still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell's flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there's nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }

private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};

Tenga en cuenta que la semántica es tal que el consumidor nunca puede leer un valor dado dos veces, y un valor que lee siempre es más nuevo que el último valor que leyó. También es bastante eficiente en el uso de la memoria (dos buffers, como su solución original). Evité los bucles CAS porque generalmente son menos eficientes que una sola operación atómica bajo contención.

Si decide utilizar el código anterior, I sugiera que escriba primero algunas pruebas unitarias completas (enhebradas) para ello. Y puntos de referencia adecuados. Lo probé, pero apenas. Avísame si encuentras algún error: -)

Mi prueba unitaria:

ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_writing();
        if (item != nullptr) {      // Always true
            *item = i;
        }
        buf.end_writing();
    }
});
std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_reading();
        if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
        }
        buf.end_reading();
    }
});
producer.join();
consumer.join();

En cuanto a su implementación original, solo la miré de forma superficial (es mucho más divertido diseñar cosas nuevas, je), pero David.la respuesta de pfx parece abordar esa parte de tu pregunta.

 10
Author: Cameron,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-05-18 05:22:26

Sí, creo que está roto.

Si el lector hace un start/end/start en sucesión, actualizará su índice de lectura al índice de escritura, y potencialmente leerá los datos del índice de escritura, incluso si la escritura está ocupada.

El problema esencialmente es que el escritor no sabe qué búfer usará el lector, por lo que el escritor debe asegurarse de que ambos búferes sean válidos en todo momento. No puede hacer eso, si va a tomar algún tiempo escribir datos en un búfer [a menos que malinterprete algunos de la lógica que no se muestra aquí.]

Sí, creo que se puede hacer sin bloqueos, usando CAS o lógica equivalente. No voy a intentar expresar un algoritmo en este espacio. Estoy seguro de que existe, pero no que pueda escribirlo correctamente la primera vez. Y un poco de búsqueda en la web encontró algunos candidatos plausibles. La IPC sin espera que utiliza CAS parece ser un tema bastante interesante y el tema de algunas investigaciones.


Después de pensar un poco más, el algoritmo es como seguir. Necesitas:

  • 3 buffers: uno para el escritor, otro para el lector y uno extra. Los tampones están ordenados: forman un anillo (pero ver nota).
  • Un estado para cada búfer: libre, completo, escritura, lectura.
  • Una función que puede inspeccionar el estado del búfer y cambiar condicionalmente el estado a un valor diferente en una sola operación atómica. Usaré CSET para eso.

Escritor:

Find the first buffer that is FREE or FULL
  Fail: assert (should never fail, reader can only use one buffer)
  CSET buffer to WRITING
Write into the buffer
CSET buffer to FULL

Lector:

Find first buffer that is FULL
    Fail: wait (writer may be slow)
    CSET buffer to READING
Read and consume buffer
CSET buffer to FREE

Nota: Esto el algoritmo no garantiza que los búferes se traten estrictamente en orden de llegada, y ningún cambio simple hará que lo haga. Si esto es importante, el algoritmo debe ser mejorado con un número de secuencia en el búfer, establecido por el escritor para que el búfer más reciente pueda ser elegido por el lector.

Dejo el código como detalle de implementación.


La función CSET no es trivial. Tiene que probar atómicamente que una ubicación de memoria compartida en particular es igual a una esperada valor y si es así cambiarlo a un nuevo valor. Devuelve true si realizó correctamente el cambio y false de lo contrario. La implementación debe evitar condiciones de carrera si dos subprocesos acceden a la misma ubicación al mismo tiempo (y posiblemente en procesadores diferentes).

La biblioteca de operaciones atómicas estándar de C++ contiene un conjunto de funciones atomic_compare_exchange que deberían servir al propósito, si están disponibles.

 4
Author: david.pfx,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-05-18 04:15:46

Aquí una versión usando InterlockedExchangePointer() y SLISTs.

Esta solución no admite la relectura del último búfer. Pero si es necesario se puede hacer en el lado del lector por medio de una copia y un if( NULL == doubleBuffer.beginReader(...) ) { use backup copy ... }.
Esto no se hace porque sea difícil de añadir, sino porque no es muy realista. Imagine que su último valor conocido se vuelve más y más antiguo: segundos, días, semanas. Es poco probable que la aplicación todavía quiera usarlo. Por lo tanto, factorizar la funcionalidad de relectura en el código de doble búfer toma alejar la flexibilidad de la aplicación.

El búfer doble tiene 1 miembro puntero de lectura. Cada vez que se llama a BeginRead (), este valor se devuelve y se reemplaza atómicamente con NULL. Piense en ello como "El lector TOMA el búfer."
Con endRead(), el lector devuelve el búfer y se añade al SLIST, conteniendo los búferes disponibles para operaciones de escritura.

Inicialmente, ambos buffers se agregan a la SLIST, el puntero de lectura es NULL.

beginWrite() muestra el siguiente búfer disponible de la RANURA. Y este valor nunca puede ser NULO, debido a la forma en que endWrite() se implementa.

Por último, endWrite() intercambia atómicamente el puntero de lectura con el búfer devuelto, recién escrito y si el puntero de lectura no era NULO, lo empuja a la RANURA.

Por lo tanto, incluso si el lado del lector nunca lee, el lado del escritor nunca se queda sin búferes. Cuando el lector lee, obtiene el último valor conocido (¡una vez!).

Contra lo que esta implementación no es segura es si hay múltiples lectores o escritores concurrentes. Pero ese no era el objetivo en primer lugar.

En el lado feo, los Búferes deben ser estructuras con algún miembro SLIST_HEADER en la parte superior.

Aquí, el código, pero tenga en cuenta que no es mi culpa si su mars rover aterriza en Venus!

const size_t MAX_DATA_SIZE = 512;
typedef
//__declspec(align(MEMORY_ALLOCATION_ALIGNMENT))
struct DataItem_tag
{
    SLIST_ENTRY listNode;
    uint8_t data[MAX_DATA_SIZE];
    size_t length;
} DataItem_t;

class CDoubleBuffer
{
    SLIST_HEADER m_writePointers;
    DataItem_t m_buffers[2];
    volatile DataItem_t *m_readPointer;

public:
    CDoubleBuffer()
        : m_writePointers()
        , m_buffers()
        , m_readPointer(NULL)
    {
        InitializeSListHead(&m_writePointers);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode);
    }
    DataItem_t *beginRead()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL));
        return result;
    }
    void endRead(DataItem_t *dataItem)
    {
        if (NULL != dataItem)
        {
            InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode);
        }
    }
    DataItem_t *beginWrite()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers));
        return result;
    }
    void endWrite(DataItem_t *dataItem)
    {
        DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem));
        if (NULL != oldReadPointer)
        {
            InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode);
        }
    }
};

Y aquí el código de prueba para ello. (Para ambos, lo anterior y el código de prueba necesita y .)

CDoubleBuffer doubleBuffer;

DataItem_t *readValue;
DataItem_t *writeValue;

// nothing to read yet. Make sure NULL is returned.
assert(NULL == doubleBuffer.beginRead());
doubleBuffer.endRead(NULL); // we got nothing, we return nothing.

// First write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 0;
doubleBuffer.endWrite(writeValue);

// Second write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 1;
doubleBuffer.endWrite(writeValue);

// Third write without read - works because it reuses the old buffer for the new write.
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 2;
doubleBuffer.endWrite(writeValue);

readValue = doubleBuffer.beginRead();
assert(NULL != readValue); // NULL would obviously be a terrible bug.
assert(2 == readValue->length); // We got the latest and greatest?
doubleBuffer.endRead(readValue);

readValue = doubleBuffer.beginRead();
assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation!
doubleBuffer.endRead(readValue);
 0
Author: BitTickler,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-01-25 04:37:49