Cómo escribir un archivo a Kafka Producer


Estoy tratando de cargar un archivo de texto simple en lugar de la entrada estándar en Kafka. Después de descargar Kafka, realicé los siguientes pasos:

Comenzó zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Servidor iniciado

bin/kafka-server-start.sh config/server.properties

Se creó un tema llamado "test":

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Corrió el Productor:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

Escuchado por el Consumidor:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

En lugar de la entrada estándar, quiero pasar un archivo de datos o incluso un simple archivo de texto al Productor que puede ser visto directamente por el Consumidor. Cualquier ayuda sería realmente apreciada. ¡Gracias!

Author: Katie, 2015-10-22

4 answers

Puede canalizarlo:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt
{[2] Encontrado} aquí.

Desde 0.9.0:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
 51
Author: Balázs Mária Németh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-01 15:43:09
$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

Funcionó para mí en Kafka-0.9.0

 5
Author: prabhugs,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-03-04 09:42:57

Aquí hay algunas formas que son un poco más generalizadas, pero pueden ser excesivas para simple file

Tail

tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Explicación

  1. tail lee desde el final del archivo a medida que crece o se agregan registros continuamente
  2. -n0 indica outputlast 0 líneas por lo que solo se selecciona nueva línea
  3. -F sigue el archivo por nombre en lugar del descriptor, por lo tanto funciona incluso si es rotado

Syslog-ng

options {                                                                                                                             
    flush_lines (0);                                                                                                                
    time_reopen (10);                                                                                                               
    log_fifo_size (1000);                                                                                                          
    long_hostnames (off);                                                                                                           
    use_dns (no);                                                                                                                   
    use_fqdn (no);                                                                                                                  
    create_dirs (no);                                                                                                               
    keep_hostname (no);                                                                                                             
};

source s_file {
    file("path to my-file.txt" flags(no-parse));
}


destination loghost {
    tcp("*.*.*.*" port(5140));
} 

consumir

nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Explicación(de man nc)

-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.

-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.

Ref

Syslog-ng

 3
Author: Confused,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-17 18:52:18
echo "Hello" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
 1
Author: Dheeraj R,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-28 18:37:27