Как записать файл в Kafka Producer

Я пытаюсь загрузить простой текстовый файл вместо стандартного ввода в Kafka. Скачав Kafka, я выполнил следующие шаги:

Запущен zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Запущенный сервер

bin/kafka-server-start.sh config/server.properties

Создал тему под названием «тест»:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Пробежал продюсер:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

Слушает Потребитель:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

Вместо стандартного ввода я хочу передать файл данных или даже простой текстовый файл Производителю, который может быть виден непосредственно Потребителем. Любая помощь будет оценена по достоинству. Спасибо!


person Katie    schedule 22.10.2015    source источник


Ответы (4)


Вы можете вставить его:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

Найдено здесь.

Начиная с 0.9.0:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
person Balázs Németh    schedule 22.10.2015
comment
Я использую Kafka-0.9. --new-maker не поддерживается в kafka-console-producer.sh, вместо этого у меня работал $ kafka-console-producer.sh --broker-list localhost: 9092 --topic my_topic ‹my_file.txt. - person prabhugs; 04.03.2016
comment
Я думаю --new-продюсер и есть собственно продюсер из 0.9 :) - person Balázs Németh; 04.03.2016
comment
@ BalázsMáriaNémeth, как передать только n строк через фиксированный интервал времени (например, пройти первые 10 строк, подождать 5 секунд, передать 10-20 строк и т. Д.). После завершения всех строк он должен снова начинаться со строки 1? - person vdep; 23.06.2016
comment
Боюсь, вам придется написать собственный сценарий. Возможно, Python? - person Balázs Németh; 24.06.2016
comment
@vdep, вы можете сделать это с помощью kafkacat: github.com/edenhill/kafkacat - person Filipe Correia; 01.07.2016
comment
@FilipeCorreia, в настоящее время я использую для этого сценарий оболочки. Но у kafkacat есть более продвинутые функции, я попробую. - person vdep; 02.07.2016
comment
Как я могу свернуть данные с этого веб-сайта потоковой передачи stream.meetup.com/2/rsvps? - person Green; 08.10.2016
comment
Кто-нибудь пробовал с поворотом my_file.txt? скажем, почасовая ротация с использованием logrotate - person Albatross; 14.01.2017
comment
Действительно полезно. Может ли он поддерживать отправку нескольких файлов размером ~ 100К? Будет ли это плохой практикой? - person wipman; 13.04.2017
comment
Это не работает для 0.11 .. есть предложения по новой версии? - person inertia; 27.07.2017
comment
это одноразовое чтение правильно !, но как постепенно читать из файла? - person jack AKA karthik; 16.08.2017
comment
Это производит данные в один раздел вместо распределения по всем назначенным разделам. Есть ли возможность производить данные для всех разделов? - person Ajeesh; 05.03.2018
comment
на Kafka 0.10 отвечает, что новый производитель не является признанным вариантом - person SparkleGoat; 14.12.2018
comment
Есть еще решение для windows cmd? - person itstata; 05.09.2019

$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

работал у меня в Kafka-0.9.0

person prabhugs    schedule 04.03.2016

Вот несколько способов, которые немного более обобщены, но могут оказаться излишними для простого файла.

хвост

tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Объяснение

  1. tail читает с конца файла по мере его роста или к нему непрерывно добавляются журналы
  2. -n0 указывает на вывод последних 0 строк, поэтому выбирается только новая строка
  3. -F следует за файлом по имени вместо дескриптора, поэтому он работает, даже если он повернут

syslog-ng

options {                                                                                                                             
    flush_lines (0);                                                                                                                
    time_reopen (10);                                                                                                               
    log_fifo_size (1000);                                                                                                          
    long_hostnames (off);                                                                                                           
    use_dns (no);                                                                                                                   
    use_fqdn (no);                                                                                                                  
    create_dirs (no);                                                                                                               
    keep_hostname (no);                                                                                                             
};

source s_file {
    file("path to my-file.txt" flags(no-parse));
}


destination loghost {
    tcp("*.*.*.*" port(5140));
} 

потребление

nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Пояснение (от man nc)

-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.

-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.

Ссылка

Syslog-ng

person Albatross    schedule 14.01.2017

person    schedule
comment
Хотя этот фрагмент кода может решить вопрос, включая объяснение, действительно помогает улучшить качество вашего сообщения. Помните, что вы отвечаете на вопрос для будущих читателей, а не только для человека, который задает его сейчас! Пожалуйста, отредактируйте свой ответ, чтобы добавить пояснения и указать, какие ограничения и предположения применяются. - person Toby Speight; 31.10.2016
comment
Иногда код - это документация. Ясно, что это просто вставка в какой-то текст. - person Michael; 04.10.2020