Прочтите тему Kafka в пакетном задании Spark

Я пишу пакетное задание Spark (v1.6.0), которое читает из темы Kafka.
Для этого я могу использовать org.apache.spark.streaming.kafka.KafkaUtils#createRDD, однако мне нужно установить смещения для всех разделов, а также нужно где-то их хранить (ZK ? HDFS?), Чтобы знать, с чего начать следующее пакетное задание.

Как правильно читать данные Kafka в пакетном задании?

Я также подумываю написать вместо этого задание потоковой передачи, которое читает из auto.offset.reset=smallest и сохраняет контрольную точку в HDFS, а затем при следующем запуске запускается с нее.

Но в этом случае, как я могу получить только один раз и прекратить потоковую передачу после первого пакета?


person Bruckwald    schedule 25.06.2016    source источник
comment
Лучше составить два отдельных вопроса.   -  person maasg    schedule 26.06.2016


Ответы (1)


createRDD - правильный подход для чтения пакета из кафки.

Чтобы запросить информацию о последних / самых ранних доступных смещениях, просмотрите KafkaCluster.scala методы getLatestLeaderOffsets и getEarliestLeaderOffsets. Этот файл был private, но должен быть public в последних версиях spark.

person Cody Koeninger    schedule 05.07.2016
comment
Спасибо за ответ также в списке искр! Вы имеете в виду, что эти методы дают рекомендации о том, как реализовать мой поиск смещения с соответствующим хранилищем в бэкэнде? - person Bruckwald; 07.07.2016
comment
Нет, эти методы предназначены для получения последних доступных смещений от kafka. См. github.com/koeninger / kafka-точно-once / blob / master / src / main / для примера поиска подтвержденных смещений из магазина. - person Cody Koeninger; 07.07.2016
comment
@CodyKoeninger Типа KafkaCuster не существует в версии 0.10. Я что-то упускаю? Я хочу выполнить пакетную обработку всех существующих записей в теме, но нет возможности получить самые ранние и последние смещения из Kafka. - person InfinitiesLoop; 01.03.2018
comment
@InfinitiesLoop новый API-интерфейс потребителя kafka позволит вам перейти к началу или концу темы, чтобы выяснить, какие смещения самые ранние и последние, см. kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/. - person Cody Koeninger; 02.03.2018
comment
@CodyKoeninger Да, но с API искрового потока вам нужно сначала начать использовать из темы, чтобы получить эти смещения. Не существует общего API-интерфейса потребителя kafka. Я просто хочу запустить пакетное задание с минимальным и максимальным смещениями для каждого раздела, а не с потоковыми данными. Вы хотите сказать, что я должен просто вставить потребительский артефакт кафки? В этом не было необходимости с клиентом Spark Streaming 0.8, но отсутствие класса KafkaCluster в 0.10 требует чего-то еще. - person InfinitiesLoop; 04.03.2018
comment
Не добавляйте потребительский артефакт Kafka, он уже включен в качестве транзитивной зависимости. Просто используйте это. - person Cody Koeninger; 05.03.2018
comment
Удалось ли вам заставить эти методы работать с Spark 1.6? Я столкнулся с той же проблемой, когда мне теперь нужно выполнить поиск смещения, чтобы предоставить его в качестве параметров в createRDD. - person Havnar; 13.04.2018