Нет доступных брокеров при попытке подключиться к Kafka через Cloudera Data Science Workbench

Я пытаюсь реализовать проект GitHub (https://github.com/tomatoTomahto/CDH-Sensor-Analytics) в нашем внутреннем кластере Hadoop с помощью Cloudera Data Science Workbench.

При запуске проекта в Cloudera Data Science Workbench я получаю сообщение об ошибке «Нет доступных брокеров» при попытке подключиться к Kafka через Python api KafkaProducer (bootstrap_servers = 'broker1: 9092') [Код можно найти в https://github.com/tomatoTomahto/CDH-Sensor-Analytics/blob/master/datagenerator/KafkaConnection.py].

Я прошел аутентификацию с помощью Kerberos. Я попытался указать узел брокера без номера порта, а также в виде списка. Но пока ничего не помогло.

Ниже представлена ​​трассировка стека.

NoBrokersAvailable: NoBrokersAvailable
NoBrokersAvailable                        Traceback (most recent call 
last)
in engine
----> 1 dgen = DataGenerator(config)

/home/cdsw/datagenerator/DataGenerator.py in __init__(self, config)
 39         
 40         self._kudu = KuduConnection(self._config['kudu_master'], 
self._config['kudu_port'], spark)
---> 41         self._kafka = 
KafkaConnection(self._config['kafka_brokers'], 
self._config['kafka_topic'])
 42 
 43         #self._kafka

/home/cdsw/datagenerator/KafkaConnection.py in __init__(self, brokers, 
topic)
  4 class KafkaConnection():
  5   def __init__(self, brokers, topic):
----> 6     self._kafka_producer = 
KafkaProducer(bootstrap_servers=brokers)
  7     self._topic = topic
  8     

/home/cdsw/.local/lib/python3.6/site-packages/kafka/producer/kafka.py 
in __init__(self, **configs)
333 
334         client = KafkaClient(metrics=self._metrics, 
metric_group_prefix='producer',
--> 335                              **self.config)
336 
337         # Get auto-discovered version from client if necessary

/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in 
__init__(self, **configs)
208         if self.config['api_version'] is None:
209             check_timeout = 
self.config['api_version_auto_timeout_ms'] / 1000
--> 210             self.config['api_version'] = 
self.check_version(timeout=check_timeout)
211 
212     def _bootstrap(self, hosts):

/home/cdsw/.local/lib/python3.6/site-packages/kafka/client_async.py in 
check_version(self, node_id, timeout, strict)
806             try_node = node_id or self.least_loaded_node()
807             if try_node is None:
--> 808                 raise Errors.NoBrokersAvailable()
809             self._maybe_connect(try_node)
810             conn = self._conns[try_node]

NoBrokersAvailable: NoBrokersAvailable

Я также попытался подключиться за пределами рабочей среды через интерфейс командной строки с помощью VPN-подключения. У меня такая же ошибка.

Любые указатели на то, что мне не хватает? Заранее спасибо!


person Sameer    schedule 14.08.2017    source источник


Ответы (1)


Первым шагом является определение того, открыт ли сетевой маршрут, а брокер активен и прослушивает этот порт. После этого вы можете проверить аутентификацию и т. Д.

Вы пробовали telnet <broker host> 9092?

Возможно, вам придется явно установить advertised.listeners в дополнение к listeners, я иногда видел странную причуду с Java, когда он не был привязан к ожидаемому сетевому интерфейсу (или, по крайней мере, к тому, который я ожидал!), И мне пришлось заставить его использовать advertised.listeners.

person Jeff Widman    schedule 13.10.2017
comment
У меня была такая же ошибка в немного другом конексте - с использованием библиотек kafka-python - обновление server.properties от слушателей к рекламируемым слушателям работало. В этой статье подробно рассказывается о различиях rmoff.net/2018/08/02/ кафка-слушатели-объяснили - person Mark Parris; 03.05.2020