kafka-console-consumer с параметром Kerberos и --zookeeper выдает BrokerEndPointNotAvailableException

Я не могу получать сообщения от Kafka с параметрами kafka-console-consumer и --zookeeper в кластере Hadoop с поддержкой Kerberos.

Вот моя тестовая среда:

  • Cloudera CDH 5.7.0
  • Кафка 0.10.1.0

Я работаю над кластером с одним узлом (hostname = quickstart.cloudera) в целях тестирования. Я произвожу сообщения с kafka-console-producer.

Для потребителей я использую эти файлы конфигурации:

jaas.conf :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

client.properties :

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

С этими файлами я могу получать сообщения при использовании --new-consumer и --bootstrap-server :

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-consumer \
    --new-consumer \
    --topic test-kafka \
    --bootstrap-server quickstart.cloudera:9092 \
    --consumer.config client.properties

С опцией --zookeeper мне пришлось обновить файл конфигурации JAAS и добавить раздел Client для Zookeeper (в противном случае он выдает предупреждение: No JAAS configuration section named 'Client' was found in specified JAAS configuration file):

jaas_with_zk.conf :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

С этим файлом команда:

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas_with_zk.conf"
kafka-console-consumer \
    --topic test-kafka \
    --zookeeper quickstart.cloudera:2181 \
    --consumer.config client.properties

выдает исключение (BrokerEndPointNotAvailableException):

WARN Property sasl.kerberos.service.name is not valid (kafka.utils.VerifiableProperties)
WARN Property security.protocol is not valid (kafka.utils.VerifiableProperties)
WARN [console-consumer-40368_quickstart.cloudera-1514305767949-b4724797-leader-finder-thread], Failed to find leader for Set([test-kafka,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.BrokerEndPointNotAvailableException: End point with security protocol PLAINTEXT not found for broker 35
        at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
        at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
        at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
        at scala.collection.AbstractMap.getOrElse(Map.scala:59)
        at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:130)
        at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:221)
        at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:221)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.utils.ZkUtils.getAllBrokerEndPointsForChannel(ZkUtils.scala:221)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

35 — это значение broker.id, указанное в kafka.properties. Я не понимаю, почему трассировка стека говорит о протоколе безопасности PLAINTEXT, потому что в файле kafka.properties нет такой информации о PLAINTEXT, только SASL_PLAINTEXT:

security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://quickstart.cloudera:9092

А в client.properties у меня есть:

security.protocol=SASL_PLAINTEXT

Более того, sasl.kerberos.service.name и security.protocol кажутся неиспользованными (первые две строки трассировки стека). Передача --security-protocol PLAINTEXTSASL (согласно этой теме) не работает (security-protocol is not a recognized option).

Для информации, вот использованный kafka.properties (я удалил свойства, которые кажутся не связанными с моей проблемой):

authenticate.zookeeper.connection=true
broker.id=35
kerberos.auth.enable=true
port=9092
security.inter.broker.protocol=SASL_PLAINTEXT
unclean.leader.election.enable=false
zookeeper.session.timeout.ms=6000
broker.id.generation.enable=false
sasl.kerberos.service.name=kafka
listeners=SASL_PLAINTEXT://quickstart.cloudera:9092

person norbjd    schedule 25.12.2017    source источник
comment
Я настоятельно рекомендую разделить вопрос как минимум на 2 разных вопроса, один из которых использует Spark Streaming в качестве клиента для получения сообщений от брокера Kafka с поддержкой Kerberos. В настоящее время вопрос слишком широк.   -  person Jacek Laskowski    schedule 26.12.2017
comment
Я подумал, что было бы неплохо задать вопросы вместе, потому что вопросы кажутся связанными. Но вы правы, это широкий вопрос, поэтому я только что создал новый вопрос, сосредоточившись только на приложении Spark Streaming в режиме YARN: messages-from-a-spark-streaming-application-in-a-kerberized-hado" title="использование сообщений kafka из приложения потоковой передачи искры в Kerberized Hado">stackoverflow.com/questions/47977075/   -  person norbjd    schedule 26.12.2017
comment
Спасибо. Тогда удалите часть вопроса, связанную с Spark Streaming.   -  person Jacek Laskowski    schedule 26.12.2017
comment
Я только что обновил этот вопрос и сохранил только проблему kafka-console-consumer. @JacekLaskowski Не могли бы вы удалить повторяющуюся отметку? Спасибо за совет, теперь вопросы более понятны   -  person norbjd    schedule 26.12.2017