Не удалось найти лидера по темам; java.lang.NullPointerException NullPointerException в org.apache.kafka.common.utils.Utils.formatAddress

Когда мы пытаемся передать данные из темы Kafka с включенным SSL, мы сталкиваемся с ошибкой ниже. Не могли бы вы помочь нам в этом вопросе?

19/11/07 13:26:54 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1573151189884] Added fetcher for partitions ArrayBuffer()
19/11/07 13:26:54 WARN ConsumerFetcherManager$LeaderFinderThread: [spark-streaming-consumer_dvtcbddc101.corp.cox.com-1573151189725-d40a510f-leader-finder-thread], Failed to find leader for Set([inst_monitor_status_test,2], [inst_monitor_status_test,0], [inst_monitor_status_test,1])
java.lang.NullPointerException
        at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:408)
        at kafka.cluster.Broker.connectionString(Broker.scala:62)
        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Код Pyspark:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer


def handler(message):
    records = message.collect()
    for record in records:
        print(record)


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
    counts.pprint()
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()

Команда отправки Spark:

Отправить Spark:

/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --packages org.apache.spark: spark-streaming-kafka-0-8_2.11: 2.1.0, org.apache.spark: spark -sql-kafka-0-10_2.11: 2.1.0, org.apache.spark: spark-sql-kafka-0-10_2.11: 2.3.0 хост dsstream2.py: 2181 inst_monitor_status_test


person Karthikeyan Rasipalayam Durai    schedule 07.11.2019    source источник
comment
at kafka.cluster.Broker.connectionString ... Похоже, вы не получили правильный адрес для кластера. Если вы print(zkQuorum), вы получили правильный адрес? Кроме того, вам действительно нужен Spark? Кажется, у вас уже есть from kafka import KafkaProducer, это собственная библиотека Python.   -  person OneCricketeer    schedule 08.11.2019
comment
Кроме того, похоже, вам не хватает каких-либо настроек, связанных с SSL для Spark   -  person OneCricketeer    schedule 08.11.2019
comment
Спасибо за вашу информацию .zkQuorum имеет правильный адрес. Но я все еще не уверен, как передать настройки, связанные с SSL, для Spark. Не могли бы вы сообщить мне, если у вас есть идеи по этому поводу. образец кода был бы отличным !!   -  person Karthikeyan Rasipalayam Durai    schedule 08.11.2019
comment
Ты видел это? spark.apache.org /docs/2.1.0/   -  person OneCricketeer    schedule 08.11.2019
comment
Судя по приведенной выше ссылке, похоже, что настройка, связанная с SSL, для Spark, мы можем включить только код Scala или Java. Я думаю, что мы не можем включить информацию, связанную с SSL, через KafkaParms в код pyspark. Не могли бы вы сообщить мне, как можно использовать функции SSL в коде pyspark для подключения кластера Kerberos.   -  person Karthikeyan Rasipalayam Durai    schedule 09.11.2019
comment
Все библиотеки Pyspark Kafka используют те же библиотеки JVM, что и Java / Scala, поэтому вы должны иметь возможность передавать те же параметры в карты Python. Проблема может заключаться в том, что вам также нужно соединение Zookeeper SASL_SSL.   -  person OneCricketeer    schedule 09.11.2019
comment
Вы можете получить более конкретные ответы от службы поддержки Cloudera. community.cloudera.com/t5/Community-Articles/   -  person OneCricketeer    schedule 09.11.2019
comment
Спасибо за ваш вклад. Найдите способ обработки SSL.   -  person Karthikeyan Rasipalayam Durai    schedule 14.11.2019


Ответы (1)


Спасибо за ваш вклад. Я передал параметры SSL следующим способом и работаю нормально, как и ожидалось.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
import time

#  Spark Streaming context :

spark = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)

#  Kafka Topic Details :

KAFKA_TOPIC_NAME_CONS = "topic_name"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'kafka_server:9093'

#  Creating  readstream DataFrame :

df = spark.readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
     .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
     .option("startingOffsets", "earliest") \
     .option("kafka.security.protocol","SASL_SSL")\
     .option("kafka.client.id" ,"Clinet_id")\
     .option("kafka.sasl.kerberos.service.name","kafka")\
     .option("kafka.ssl.truststore.location", "/home/path/kafka_trust.jks") \
     .option("kafka.ssl.truststore.password", "password_rd") \
     .option("kafka.sasl.kerberos.keytab","/home/path.keytab") \
     .option("kafka.sasl.kerberos.principal","path") \
     .load()

df1 = df.selectExpr( "CAST(value AS STRING)")

#  Creating  Writestream DataFrame :

df1.writeStream \
   .option("path","target_directory") \
   .format("csv") \
   .option("checkpointLocation","chkpint_directory") \
   .outputMode("append") \
   .start()

ssc.awaitTermination()
person Karthikeyan Rasipalayam Durai    schedule 14.11.2019
comment
каковы параметры kafkaParams при использовании KafkaUtils.createDirectStream? Если вы знаете, пожалуйста, дайте мне знать. - person hopeIsTheonlyWeapon; 23.07.2020