kafka-python (1.0.0) выдает ошибку при подключении к брокеру. При этом / usr / bin / kafka-console-продюсер и / usr / bin / kafka-console-consumer работают нормально.
Приложение Python также работало хорошо, но после перезапуска zookeeper оно больше не может подключаться.
Я использую пример с голыми костями из документации:
from kafka import KafkaProducer
from kafka.common import KafkaError
producer = KafkaProducer(bootstrap_servers=['hostname:9092'])
# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
Я получаю эту ошибку:
Traceback (most recent call last): File "pp.py", line 4, in <module>
producer = KafkaProducer(bootstrap_servers=['hostname:9092']) File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
self.config['api_version'] = client.check_version() File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
connect(node_id) File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored
При переходе через (/usr/lib/python2.6/site-packages/kafka/client_async.py) я заметил, что строка 270 оценивается как ложная:
270 if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271 if self._can_send_request(node_id):
272 return True
273 return False
В моем случае self._metadata_refresh_in_progress имеет значение False, но ttl () = 0;
В то же время kafka-console- * с радостью рассылает сообщения:
/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2
Любой совет?