Estrategia efectiva para evitar mensajes duplicados en apache kafka consumidor


He estado estudiando apache kafka durante un mes. Sin embargo, estoy atascado en un punto ahora. Mi caso de uso es que tengo dos o más procesos de consumo que se ejecutan en diferentes máquinas. Realicé algunas pruebas en las que publiqué 10.000 mensajes en el servidor kafka. Luego, mientras procesaba estos mensajes, maté uno de los procesos del consumidor y lo reinicié. Los consumidores escribían mensajes procesados en un archivo. Así que después de que el consumo terminó, archivo estaba mostrando más de 10k mensajes. Así que algunos mensajes fueron duplicado.

En el proceso del consumidor he deshabilitado la confirmación automática. Los consumidores comprometen manualmente compensaciones por lotes. Así que, por ejemplo, si se escriben 100 mensajes en el archivo, consumer commits offsets. Cuando el proceso de un solo consumidor se está ejecutando y se bloquea y se recupera la duplicación se evita de esta manera. Pero cuando más de un consumidor se está ejecutando y uno de ellos se bloquea y se recupera, escribe mensajes duplicados en el archivo.

¿Existe alguna estrategia efectiva para evitar estos mensajes duplicados?

Author: Shades88, 2015-04-15

3 answers

La respuesta corta es no.

Lo que usted está buscando es exactamente-una vez el procesamiento. Si bien a menudo puede parecer factible, nunca se debe confiar en él porque siempre hay advertencias.

Incluso con el fin de intentar evitar duplicados que tendría que utilizar el consumidor simple. Cómo funciona este enfoque es para cada consumidor, cuando se consume un mensaje de alguna partición, escriba la partición y el desplazamiento del mensaje consumido en el disco. Cuando el consumidor se reinicia después de un error, lea el último offset consumido para cada partición desde el disco.

Pero incluso con este patrón, el consumidor no puede garantizar que no reprocesará un mensaje después de un fallo. ¿Qué pasa si el consumidor consume un mensaje y luego falla antes de que el desplazamiento se vacíe al disco? Si escribe en el disco antes de procesar el mensaje, ¿qué pasa si escribe el desplazamiento y luego falla antes de procesar realmente el mensaje? Este mismo problema existiría incluso si tuviera que comprometer compensaciones a ZooKeeper después de cada mensaje.

Hay algunos casos, sin embargo, donde exactamente - una vez que el procesamiento es más alcanzable, pero solo para ciertos casos de uso. Esto simplemente requiere que su desplazamiento se almacene en la misma ubicación que la salida de la aplicación de la unidad. Por ejemplo, si escribe un consumidor que cuenta mensajes, almacenando el último offset contado con cada recuento puede garantizar que el offset se almacena al mismo tiempo que el estado del consumidor. Por supuesto, con el fin de garantizar exactamente-una vez que el procesamiento de esto sería requiere que consumas exactamente un mensaje y actualices el estado exactamente una vez para cada mensaje, y eso es completamente impráctico para la mayoría de las aplicaciones de consumo de Kafka. Por su naturaleza, Kafka consume mensajes en lotes por razones de rendimiento.

Por lo general, su tiempo estará más bien empleado y su aplicación será mucho más confiable si simplemente la diseña para que sea idempotente.

 15
Author: kuujo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-31 10:51:55

Esto es lo que Kafka FAQ tiene que decir sobre el tema de exactamente-una vez:

¿Cómo obtengo exactamente un mensaje de Kafka?

Exactamente una semántica tiene dos partes: evitar la duplicación durante la producción de datos y evitar duplicados durante el consumo de datos.

Hay dos enfoques para obtener semántica exactamente una vez durante la producción de datos:

  • Utilice un solo escritor por partición y cada vez que obtenga una comprobación de errores de red el último mensaje en esa partición para ver si su última escritura tuvo éxito
  • Incluya una clave primaria (UUID o algo así) en el mensaje y deduplice en el consumidor.

Si hace una de estas cosas, el registro que aloja Kafka estará libre de duplicados. Sin embargo, la lectura sin duplicados también depende de la cooperación del consumidor. Si el consumidor está chequeando periódicamente su posición, entonces si falla y se reinicia, se reiniciará desde el checkpointed posición. Por lo tanto, si la salida de datos y el punto de control no se escriben atómicamente, también será posible obtener duplicados aquí. Este problema es particular para su sistema de almacenamiento. Por ejemplo, si está utilizando una base de datos, podría confirmarlos juntos en una transacción. El HDFS loader Camus que LinkedIn escribió hace algo como esto para las cargas de Hadoop. La otra alternativa que no requiere una transacción es almacenar el desplazamiento con los datos cargados y deduplicar utilizando el combinación de tema / partición / desplazamiento.

Creo que hay dos mejoras que harían esto mucho más fácil:

  • La idempotencia del productor podría hacerse de forma automática y mucho más barata integrando opcionalmente el soporte para esto en el servidor.
  • El consumidor de alto nivel existente no expone mucho del control más fino de las compensaciones (por ejemplo, para restablecer su posición). Estaremos trabajando en eso pronto
 14
Author: RaGe,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-07-28 08:50:57

Estoy de acuerdo con el deduplicado de RaGe en el lado del consumidor. Y usamos Redis para desduplicar el mensaje de Kafka.

Supongamos que la clase Message tiene un miembro llamado 'uniqId', que es llenado por el lado productor y se garantiza que es único. Usamos una cadena aleatoria de 12 longitudes. (regexp es '^[A-Za-z0-9]{12}$')

El lado consumidor usa SETNX de Redis para deduplicar y EXPIRAR para purgar las claves caducadas automáticamente. Código de ejemplo:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

El código anterior detectó mensajes duplicados varias veces cuando Kafka (versión 0.8.x) tuvo situaciones. Con nuestro registro de auditoría de balance de entrada/salida, no se perdió ningún mensaje o ocurrió dup.

 12
Author: peihan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-05-12 10:23:06