kafka-python: производитель не может подключиться

kafka-python (1.0.0) выдает ошибку при подключении к брокеру. При этом / usr / bin / kafka-console-продюсер и / usr / bin / kafka-console-consumer работают нормально.

Приложение Python также работало хорошо, но после перезапуска zookeeper оно больше не может подключаться.

Я использую пример с голыми костями из документации:

from kafka import KafkaProducer
from kafka.common import KafkaError

producer = KafkaProducer(bootstrap_servers=['hostname:9092'])

# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')

Я получаю эту ошибку:

Traceback (most recent call last):   File "pp.py", line 4, in <module>
    producer = KafkaProducer(bootstrap_servers=['hostname:9092'])   File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
    self.config['api_version'] = client.check_version()   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
    connect(node_id)   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
    raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored

При переходе через (/usr/lib/python2.6/site-packages/kafka/client_async.py) я заметил, что строка 270 оценивается как ложная:

270         if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271             if self._can_send_request(node_id):
272                 return True
273         return False

В моем случае self._metadata_refresh_in_progress имеет значение False, но ttl () = 0;

В то же время kafka-console- * с радостью рассылает сообщения:

/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2

Любой совет?


person alex_123    schedule 28.02.2016    source источник


Ответы (7)


У меня была такая же проблема, и ни одно из вышеперечисленных решений не помогло. Затем я читаю сообщения об исключениях, и кажется, что необходимо указать api_version, поэтому

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0))

примечание: кортеж (1,0,0) соответствует версии kafka 1.0.0

работает нормально (по крайней мере, завершается без исключений, теперь придется убеждать его принимать сообщения;))

person Egor Kraev    schedule 27.10.2016
comment
но затем он останавливается после отправки первого сообщения - person Amin Alaee; 04.01.2017
comment
Это сработало для меня как обаяние. В противном случае Kafka 0.10 сломал для меня kafka-python. - person rjurney; 21.01.2017
comment
Если у кого-то возникают проблемы, когда он не отправляет больше данных после первого сообщения, вы должны сделать producer.flush(), чтобы очистить буферы отправки. - person noblerare; 30.08.2017
comment
api_version не является обязательным в версии 1.3.5. Однако, если не использовать аргумент с ключом, он попытается автоматически определить версию, которая не работает на моем текущем брокере (docker image wurstmeister / kafka-docker) - person sshow; 25.12.2017

У меня была похожая проблема. В моем случае имя хоста брокера было неразрешимым на стороне клиента. Попробуйте явно указать advertised.host.name в файле конфигурации.

person user3503929    schedule 01.03.2016

У меня такая же проблема.

Решил проблему подсказкой user3503929.

Сервер kafka был установлен на windows.

server.properties

...
host.name = 0.0.0.0
...

.

producer = KafkaProducer(bootstrap_servers='192.168.1.3:9092',         
                                         value_serializer=str.encode)
producer.send('test', value='aaa')
producer.close()
print("DONE.")

Проблем с обработкой в ​​клиенте windows kafka не было. Однако, когда я отправляю сообщение в тему с помощью kafka-python в ubuntu, возникает NoBrokersAvailable исключение.

Добавьте следующие настройки в server.properties.

...
advertised.host.name = 192.168.1.3
...

Он успешно работает в том же коде. Я потратил на это три часа.

Спасибо

person midi sampler    schedule 03.11.2017

У хоста может быть несколько псевдонимов DNS. Любой из них подойдет для теста ssh или ping. Однако соединение kafka должно использовать псевдоним, который соответствует advertised.host.name в server.properties файле брокера.

Я использовал другой псевдоним в параметре bootstrap_servers. Отсюда ошибка. Как только я изменил вызов на advertised.hostname, проблема была решена

person alex_123    schedule 05.03.2016

У меня была аналогичная проблема, и удаление порта с bootstrap_servers помогло.

consumer = KafkaConsumer('my_topic',
                     #group_id='x',
                     bootstrap_servers='kafka.com')
person Avia    schedule 31.05.2016

Установите kafka-python, используя pip install kafka-python

Шаги по созданию конвейера данных kafka: -
1. Запустите Zookeeper с помощью команды оболочки или установите zookeeperd с помощью

sudo apt-get install zookeeperd 

Это запустит zookeeper как демон и по умолчанию прослушивает порт 2181

  1. Запустите сервер kafka
  2. Запустите скрипт с Producer.py и consumer.py на разных консолях, чтобы увидеть данные в реальном времени.

Вот команды для запуска: -

cd kafka-directory
./bin/zookeeper-server-start.sh  ./config/zookeeper.properties    
./bin/kafka-server-start.sh  ./config/server.properties

Теперь, когда у вас запущены zookeeper и сервер kafka, запустите скрипт Produce.py и consumer.py.

Producer.py:

от kafka import KafkaProducer import time

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
topic = 'test'
lines = ["1","2","3","4","5","6","7","8"]
for line in lines:
  try:
    producer.send(topic, bytes(line, "UTF-8")).get(timeout=10)
  except IndexError as e:
    print(e)
  continue

Consumer.py:-

from kafka import KafkaConsumer
topic = 'test'
consumer = KafkaConsumer(topic, bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    # print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
    #                                       message.offset, message.key,
    #                                       message.value))
    print(message)

Теперь запустите Producer.py и consumer.py в отдельных терминалах, чтобы увидеть данные в реальном времени ..!

Примечание. Вышеупомянутый скрипт Producer.py запускается только один раз, чтобы запустить его навсегда, используйте цикл while и модуль времени.

person Naseer-shaik    schedule 10.03.2018

В файле server.properties убедитесь, что IP-адрес прослушивателя установлен на IP-адрес вашего ящика, доступный для удаленного компьютера. По умолчанию это localhost

Обновите эту строку в своем server.properties:

listeners=PLAINTEXT://<Your-IP-address>:9092

Также убедитесь, что у вас нет брандмауэра, который может блокировать доступ к другим IP-адресам. Если у вас есть sudo previleges. Попробуйте отключить брандмауэр.

sudo systemctl stop firewalld
person ashdnik    schedule 22.08.2017