Usando ZeroMQ junto con Boost:: ASIO


Tengo una aplicación C++ que está usando ZeroMQ para algunos mensajes. Pero también tiene que proporcionar una conexión SGCI para un servicio web basado en AJAX / Comet.

Para esto necesito un socket TCP normal. Podría hacer eso con enchufes Posix normales, pero para permanecer portátil multiplataforma y hacer mi vida más fácil (espero...) Estaba pensando en usar Boost:: ASIO.

Pero ahora tengo el choque de ZMQ queriendo usar su propio zmq_poll() y ASIO es io_service.run()...

¿Hay una manera de conseguir que ASIO trabajar junto con el 0MQ zmq_poll()?

¿O hay alguna otra forma recomendada de lograr tal configuración?

Nota: Podría resolverlo usando múltiples subprocesos, pero es solo una pequeña caja de núcleo / CPU que ejecutará ese programa con una cantidad muy baja de tráfico SCGI, por lo que multithreading sería un desperdicio de recursos...

Author: ildjarn, 2012-10-11

4 answers

Después de leer la documentación aquí y aquí , específicamente este párrafo

ZMQ_FD: Recupera el descriptor de fichero asociado al socket ZMQ_FD opción recuperará el descriptor de archivo asociado con el socket especificado. El descriptor de archivo devuelto se puede utilizar para integrar el zócalo en un bucle de eventos existente; la biblioteca ØMQ señalará cualquier evento pendiente en el zócalo en un disparo de borde moda haciendo el descriptor de archivo prepárate para leer.

Creo que puedes usar null_buffers para cada zmq_pollitem_t y diferir el bucle de eventos a un io_service, omitiendo completamente zmq_poll() por completo. Sin embargo, parece haber algunas advertencias en la documentación mencionada, en particular[11]}

La capacidad de leer desde el descriptor de archivo devuelto no indicar necesariamente que los mensajes están disponibles para ser leídos, o se puede escribir en el socket subyacente; las aplicaciones deben recuperar real estado del evento con una posterior recuperación de los ZMQ_EVENTS opcion.

Así que cuando el controlador para uno de sus sockets zmq se dispara, tendrá que hacer un poco más de trabajo antes de manejar el evento, creo. Pseudo-código sin compilar está por debajo de

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
    boost::asio::null_buffers(),
    [=](const boost::system::error_code& error)
    {
       if (!error) {
           // handle data ready to be read
       }
     }
);

Tenga en cuenta que no tiene que usar una lambda aquí, boost::bind para una función miembro sería suficiente.

 14
Author: Sam Miller,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:25:29

Al final me di cuenta de que hay dos soluciones posibles:

  • Sam Miller donde usamos el bucle de eventos de ASIO
  • El bucle de eventos de ZeroMQ obteniendo los descriptores de archivo ASIO a través de los métodos .native() de acceptor y socket e insertándolos en el array de zmq_pollitem_t

He aceptado la respuesta de Sam Miller ya que para mí es la mejor solución en el caso SCGI donde constantemente se crean y terminan nuevas conexiones. Manejando así cada cambio zmq_pollitem_t array es una gran molestia que se puede evitar mediante el uso del bucle de eventos ASIO.

 2
Author: Chris,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2012-10-14 16:09:49

Obtener el socket a ZeroMQ es la parte más pequeña de la batalla. ZeroMQ se basa en un protocolo que está en capas sobre TCP, por lo que tendrá que reimplementar ZeroMQ dentro de un Boost personalizado.Asio io_service si vas por esta ruta. Me encontré con el mismo problema al crear un servicio asíncrono ENet usando Boost.Asio primero simplemente tratando de capturar el tráfico de un cliente ENet usando un Boost.Asio UDP service. ENet es un TCP como protocolo capas sobre UDP, por lo que todo lo que he logrado en ese punto era la captura de paquetes en un estado prácticamente inútil.

Aumentar.Asio está basado en plantillas, y el io_service integrado usa plantillas para básicamente envolver la biblioteca de sockets del sistema para crear el servicio TCP y UDP. Mi solución final fue crear un io_service personalizado que envolviera la biblioteca Enet en lugar de la biblioteca de sockets de sistemas, lo que le permitía usar las funciones de transporte de ENet en lugar de tener que volver a implementarlas utilizando el transporte UDP incorporado.

Lo mismo se puede hacer para ZeroMQ, pero ZeroMQ ya es una biblioteca de red de muy alto rendimiento por derecho propio que ya proporciona E/S async.Creo que puede crear una solución viable recibiendo mensajes utilizando la API existente de ZeroMQ y pasando los mensajes a un grupo de subprocesos io_service. De esta manera, los mensajes/tareas seguirán siendo manejados asincrónicamente usando Boost.El patrón del reactor de Asio sin tener que reescribir nada. ZeroMQ proporcionará la E/S asincrónica, Boost.Asio proporcionará la tarea async manipuladores / trabajadores.

El io_service existente también se puede acoplar a un socket TCP existente, lo que permite al threadpool manejar tanto TCP (HTTP en su caso) como ZeroMQ. Es totalmente posible en esta configuración que los controladores de tareas de ZeroMQ accedan a los objetos de sesión de servicios TCP, lo que le permite enviar los resultados del mensaje/tarea de ZeroMQ a un cliente TCP.

Lo siguiente es solo para ilustrar el concepto.

// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
    boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
}

while (1) {
    char buffer [10];
    zmq_recv (responder_, buffer, 10, 0);
    io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
 2
Author: JSON,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-02-10 06:06:29

La solución es sondear tu io_service también en lugar de run().

Echa un vistazo a esta soluciónpara obtener información de poll().

Usar sondeo en lugar de ejecutar le permitirá sondear las conexiones de zmq sin ningún problema de bloqueo.

 0
Author: g19fanatic,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:25:29