Отказоустойчивые для Kafka Direct Stream не работают. Каталог контрольной точки не существует

Я пишу приложение для чтения данных из темы Kafka. И не могу добиться отказоустойчивости в случае сбоя драйвера. Приложение работает в кластере k8s с использованием spark submit. Когда я запускаю свое приложение в первый раз, все идет хорошо, но когда я удаляю pod из кластера, перезапуск приложения приводит к ошибке. Checkpoint directory does not exist: file:/opt/spark/.../rdd-541. Я использую отказоустойчивое хранилище. Ниже фрагмент кода и ошибка более подробно. Спасибо за помощь. Дайте мне знать, если мало деталей.

def functionToCreateContext():
    sc = SparkContext("spark-master", "kafka_spark")
    sc.setLogLevel('ERROR')
    ssc = StreamingContext(sc, 20)
    kafkaParams = {'bootstrap.servers': 'kafka.cluster'}
    kafkaStream = KafkaUtils.createDirectStream(ssc, ['topic'], kafkaParams)
    statistic_window = kafkaStream.transform(parse_reduce).reduceByKeyAndWindow(lambda x, y: x + y, 
                     lambda x, y: x - y,60, 20)
    top = statistic_window.transform(found)
    top.pprint()
    ssc.checkpoint(cpd)
    return ssc

if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
    ssc.start()
    ssc.awaitTermination()

проследить:

20/03/10 14:05:19 INFO SparkContext: Created broadcast 1 from checkpointFile at DStreamCheckpointData.scala:114
Traceback (most recent call last):
  File "/app-spark/kafka_spark.py", line 75, in <module>
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 105, in getOrCreate
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.tryRecoverFromCheckpoint.
: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
        at scala.Predef$.require(Predef.scala:224)

person Дмитрий Киселёв    schedule 10.03.2020    source источник


Ответы (1)


Я предполагаю, что вы получаете эту ошибку, потому что ваш каталог контрольной точки не является постоянным томом. Поэтому, когда вы удаляете модуль, указанный каталог также удаляется, и вы получаете эту ошибку Checkpoint directory does not exist.

Решением будет использование Persistent Volume для каталога контрольных точек.

Здесь вы также можете найти пример в той же теме.

person alpert    schedule 10.03.2020
comment
Спасибо, забыл добавить, папка в поде существует, но с другим названием. например /opt/spark/.../rdd-205. И потоковый контекст хочет начать работать с /opt/spark/.../rdd-183 после удаления модуля. Папка /opt/spark/.../rdd-183 ранее существовала и исчезла. - person Дмитрий Киселёв; 11.03.2020