Java, Cómo obtener el número de mensajes en un tema en apache kafka


Estoy usando apache kafka para mensajería. He implementado el productor y el consumidor en Java. ¿Cómo podemos obtener el número de mensajes en un tema?

Author: Eric Leschinski, 2015-02-18

13 answers

La única manera que viene a la mente para esto desde el punto de vista del consumidor es consumir realmente los mensajes y luego contarlos.

El broker Kafka expone contadores JMX para el número de mensajes recibidos desde el inicio, pero no puede saber cuántos de ellos ya se han purgado.

En la mayoría de los escenarios comunes, los mensajes en Kafka se ven mejor como un flujo infinito y obtener un valor discreto de cuántos se mantienen actualmente en el disco no es relevante. Además cosas obtener más complicado cuando se trata de un grupo de corredores que todos tienen un subconjunto de los mensajes en un tema.

 16
Author: Lundahl,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-02-19 21:57:45

No es java, pero puede ser útil

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'
 54
Author: ssemichev,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-02-15 20:02:04

En realidad uso esto para comparar mi POC. El artículo que desea utilizar ConsumerOffsetChecker. Puede ejecutarlo usando el script bash como se muestra a continuación.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

Y abajo está el resultado : introduzca la descripción de la imagen aquí Como puede ver en el cuadro rojo, 999 es el número de mensaje actualmente en el tema.

Actualización: ConsumerOffsetChecker está obsoleto desde 0.10.0, es posible que desee comenzar a usar ConsumerGroupCommand.

 13
Author: Rudy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-07-17 16:25:05

Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

Un motor super SQL, proporcionado por Facebook, que se conecta en varias fuentes de datos (Cassandra, Kafka, JMX, Redis ...).

PrestoDB se ejecuta como un servidor con trabajadores opcionales (hay un modo independiente sin trabajadores adicionales), luego utiliza un pequeño JAR ejecutable (llamado presto CLI) para realizar consultas.

Una vez que haya configurado bien el servidor Presto, puede usar traditionnal SQL:

SELECT count(*) FROM TOPIC_NAME;
 9
Author: Thomas Decaux,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-02-21 19:24:13

Para obtener todos los mensajes almacenados para el tema, puede buscar al consumidor al principio y al final del flujo para cada partición y sumar los resultados

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
 3
Author: AutomatedMike,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-27 11:02:56

Comando Apache Kafka para obtener mensajes no manejados en todas las particiones de un tema:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Impresiones:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

La columna 6 son los mensajes no manejados. Sumarlos así:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

Awk lee las filas, salta la línea de encabezado y suma la 6a columna y al final imprime la suma.

Imprime

5
 2
Author: Eric Leschinski,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-08-31 14:07:58

En las versiones más recientes de Kafka Manager, hay una columna titulada Suma las Compensaciones recientes.

introduzca la descripción de la imagen aquí

 2
Author: f01,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-01-05 06:38:48

A veces el interés está en conocer el número de mensajes en cada partición, por ejemplo, al probar un particionador personalizado.Los pasos siguientes han sido probados para trabajar con Kafka 0.10.2.1-2 de Confluent 3.2. Dado un tema de Kafka, kt y la siguiente línea de comandos:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

Que imprime la salida de muestra que muestra el número de mensajes en las tres particiones:

kt:2:6138
kt:1:6123
kt:0:6137

El número de líneas podría ser más o menos dependiendo del número de particiones para el tema.

 2
Author: pdp,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-04-29 04:26:16

No he probado esto yo mismo, pero parece tener sentido.

También puede utilizar kafka.tools.ConsumerOffsetChecker (source ).

 1
Author: hba,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-02-02 21:30:21

Usando el cliente Java de Kafka 2.11-1.0.0, puede hacer lo siguiente:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

La salida es algo así:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
 1
Author: Christophe Quintard,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-11-15 17:34:04

./kafka-console-consumer.sh your from-beginning new new-consumer your bootstrap-server yourbroker: 9092 property property print.key = true property property print.value = false property propiedad print.partition topic topic yourtopic tim timeout-ms 5000 / tail-n 10 / grep "Processed a total of"

 0
Author: Borislav Markov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-08-06 14:37:06

Extractos de Kafka docs

Desaprobaciones en 0.9.0.0

El kafka-consumer-offset-checker.sh (kafka.herramienta.ConsumerOffsetChecker) ha quedado obsoleto. En el futuro, por favor use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) para esta funcionalidad.

Estoy ejecutando Kafka broker con SSL habilitado tanto para el servidor como para el cliente. Debajo del comando utilizo

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

Donde /tmp / ssl_config es el siguiente

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
 0
Author: S R Bandi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-08-20 09:25:43

Si tiene acceso a la interfaz JMX del servidor, las compensaciones de inicio y fin están presentes en:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(necesita reemplazar TOPICNAME & PARTITIONNUMBER). Tenga en cuenta que debe verificar cada una de las réplicas de una partición dada, o debe averiguar cuál de los corredores es el líder de una partición dada (y esto puede cambiar con el tiempo).

Alternativamente, puede usar Los métodos Kafka Consumer beginningOffsets y endOffsets.

 0
Author: Adam Kotwasinski,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-08-20 15:18:23