Я не могу получать сообщения от Kafka с параметрами kafka-console-consumer
и --zookeeper
в кластере Hadoop с поддержкой Kerberos.
Вот моя тестовая среда:
- Cloudera CDH 5.7.0
- Кафка 0.10.1.0
Я работаю над кластером с одним узлом (hostname = quickstart.cloudera
) в целях тестирования. Я произвожу сообщения с kafka-console-producer
.
Для потребителей я использую эти файлы конфигурации:
jaas.conf
:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
client.properties
:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
С этими файлами я могу получать сообщения при использовании --new-consumer
и --bootstrap-server
:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-consumer \
--new-consumer \
--topic test-kafka \
--bootstrap-server quickstart.cloudera:9092 \
--consumer.config client.properties
С опцией --zookeeper
мне пришлось обновить файл конфигурации JAAS и добавить раздел Client
для Zookeeper (в противном случае он выдает предупреждение: No JAAS configuration section named 'Client' was found in specified JAAS configuration file
):
jaas_with_zk.conf
:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};
С этим файлом команда:
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas_with_zk.conf"
kafka-console-consumer \
--topic test-kafka \
--zookeeper quickstart.cloudera:2181 \
--consumer.config client.properties
выдает исключение (BrokerEndPointNotAvailableException
):
WARN Property sasl.kerberos.service.name is not valid (kafka.utils.VerifiableProperties)
WARN Property security.protocol is not valid (kafka.utils.VerifiableProperties)
WARN [console-consumer-40368_quickstart.cloudera-1514305767949-b4724797-leader-finder-thread], Failed to find leader for Set([test-kafka,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.BrokerEndPointNotAvailableException: End point with security protocol PLAINTEXT not found for broker 35
at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:130)
at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:221)
at kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChannel$1.apply(ZkUtils.scala:221)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.utils.ZkUtils.getAllBrokerEndPointsForChannel(ZkUtils.scala:221)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
35
— это значение broker.id
, указанное в kafka.properties
. Я не понимаю, почему трассировка стека говорит о протоколе безопасности PLAINTEXT
, потому что в файле kafka.properties
нет такой информации о PLAINTEXT
, только SASL_PLAINTEXT
:
security.inter.broker.protocol=SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://quickstart.cloudera:9092
А в client.properties
у меня есть:
security.protocol=SASL_PLAINTEXT
Более того, sasl.kerberos.service.name
и security.protocol
кажутся неиспользованными (первые две строки трассировки стека). Передача --security-protocol PLAINTEXTSASL
(согласно этой теме а>) не работает (security-protocol is not a recognized option
).
Для информации, вот использованный kafka.properties
(я удалил свойства, которые кажутся не связанными с моей проблемой):
authenticate.zookeeper.connection=true
broker.id=35
kerberos.auth.enable=true
port=9092
security.inter.broker.protocol=SASL_PLAINTEXT
unclean.leader.election.enable=false
zookeeper.session.timeout.ms=6000
broker.id.generation.enable=false
sasl.kerberos.service.name=kafka
listeners=SASL_PLAINTEXT://quickstart.cloudera:9092
kafka-console-consumer
. @JacekLaskowski Не могли бы вы удалить повторяющуюся отметку? Спасибо за совет, теперь вопросы более понятны - person norbjd   schedule 26.12.2017