¿Cómo puedo enviar mensajes grandes con Kafka (más de 15 MB)?


Envío String-messages a Kafka V. 0.8 con la API de Java Producer. Si el tamaño del mensaje es de aproximadamente 15 MB obtengo un MessageSizeTooLargeException. He intentado establecer message.max.bytesa 40 MB, pero todavía tengo la excepción. Los mensajes pequeños funcionaban sin problemas.

(La excepción aparece en el productor, no tengo un consumidor en esta aplicación.)

¿Qué puedo hacer para deshacerme de esta excepción?

Mi ejemplo producer config

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Error-Log:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Author: Martin Thoma, 2014-01-09

5 answers

Necesitas ajustar tres (o cuatro) propiedades:

  • Lado del consumidor:fetch.message.max.bytes - esto determinará el tamaño más grande de un mensaje que puede ser obtenido por el consumidor.
  • Broker side: replica.fetch.max.bytes - esto permitirá que las réplicas en los corredores envíen mensajes dentro del clúster y se aseguren de que los mensajes se replican correctamente. Si esto es demasiado pequeño, entonces el mensaje nunca se replicará, y por lo tanto, el consumidor nunca verá el mensaje porque el mensaje nunca estar comprometido (totalmente replicado).
  • Broker side: message.max.bytes - este es el tamaño más grande del mensaje que puede recibir el broker de un productor.
  • Lado del corredor (por tema): max.message.bytes - este es el tamaño más grande del mensaje que el corredor permitirá agregar al tema. Este tamaño se valida precompresión. (Por defecto es el broker message.max.bytes.)

Me enteré de la manera difícil sobre el número 2 - usted no recibe NINGUNA excepción, mensajes o advertencias de Kafka, así que asegúrese de tenga en cuenta esto cuando está enviando mensajes grandes.

 106
Author: laughing_man,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-05-20 18:34:27

Se requieren cambios menores para Kafka 0.10 y el nuevo consumidor en comparación con laughing_man's answer :

  • Broker: Sin cambios, todavía necesita aumentar las propiedades message.max.bytes y replica.fetch.max.bytes. message.max.bytes tiene que ser igual o menor(*) que replica.fetch.max.bytes.
  • Productor: Aumenta max.request.size para enviar el mensaje más grande.
  • Consumidor: Aumenta max.partition.fetch.bytes para recibir mensajes más grandes.

( * ) Lea los comentarios para obtener más información sobre message.max.bytesreplica.fetch.max.bytes

 28
Author: Sascha Vetter,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 10:31:16

Necesita anular las siguientes propiedades:

Broker Configs (KAF KAFKA_HOME/config/server.propiedades)

  • réplica.tráelo.max.bytes
  • mensaje.max.bytes

Consumer Configs (KAF KAFKA_HOME/config/consumer.propiedades)
Este paso no funcionó para mí. Lo agrego a la aplicación del consumidor y estaba funcionando bien

  • buscar.mensaje.max.bytes

Reinicie el servidor.

Mira esta documentación para más info: http://kafka.apache.org/08/configuration.html

 9
Author: user2550587,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-02-17 09:56:58

La idea es tener el mismo tamaño de mensaje enviado desde el Productor de Kafka al Corredor de Kafka y luego recibido por el Consumidor de Kafka, es decir,

Kafka producer > > Kafka Broker {>Kafka Consumer

Supongamos que si el requisito es enviar 15MB de mensaje, entonces el Productor, el Corredor y el Consumidor, los tres, deben estar sincronizados.

Kafka Productor envía 15 MB --> Kafka Corredor Permite a los/las Tiendas de 15 MB --> Kafka Consumidor recibe 15 MB

El ajuste por lo tanto debe ser A.) En El Corredor: mensaje.max.bytes=15728640 Replica.tráelo.max.bytes = 15728640

B.) Sobre el Consumidor: tráelo.mensaje.max.bytes = 15728640

 7
Author: Ravi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-11 06:43:42

Una cosa clave para recordar que el atributo message.max.bytes debe estar sincronizado con la propiedad fetch.message.max.bytes del consumidor. el tamaño de búsqueda debe ser al menos tan grande como el tamaño máximo del mensaje de lo contrario podría haber una situación en la que los productores pueden enviar mensajes más grandes de lo que el consumidor puede consumir/buscar. Quizá valga la pena echarle un vistazo.
¿Qué versión de Kafka está utilizando? También proporcionar algunos más detalles de seguimiento que usted está recibiendo. ¿hay algo así?.. payload size of xxxx larger than 1000000 que viene en el ¿tronco?

 5
Author: user2720864,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-01-14 10:14:03