Что означает исключение kafka.common.OffsetOutOfRangeException

Я пытаюсь загрузить данные через Apache Kafka и постоянно получаю эту ошибку:

kafka.common.OffsetOutOfRangeException: смещение 1003786 вне допустимого диапазона в kafka.log.Log $ .findRange (Log.scala: 46) в kafka.log.Log.read (Log.scala: 264) в kafka.server.KafkaRequestHandlers. kafka $ server $ KafkaRequestHandlers $$ readMessageSet (KafkaRequestHandlers.scala: 112) на kafka.server.KafkaRequestHandlers $$ anonfun $ 2.apply (KafkaRequestHandlers.scala: 101) на kafka.server.Kafka. : 100) в scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 206) в scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 206) в scala.collection. IndexedSeqOptimized $ class.foreach (IndexedSeqOptimized.scala: 34) в scala.collection.mutable.ArrayOps.foreach (ArrayOps.scala: 34) в scala.collection.TraversableLike $ class.map (TraversableLike.scala: 206) в scala.collection .mutable.ArrayOps.map (ArrayOps.scala: 34) на kafka.server.KafkaRequestHandlers.handleMultiFetchRequest (KafkaRequestHandlers.scala: 100) на kafka.server.KafkaRequestHandlers $$ anonfun $ handlerFor $ 3.apply (KafkaRequestHandlers.scala: 40) на kaflerska. (KafkaRequestHandlers.scala: 40) в kafka.network.Processor.handle (SocketServer.scala: 296) в kafka.network.Processor.read (SocketServer.scala: 319) в kafka.network.Processor.run (SocketServer.scala: 214) на java.lang.Thread.run (Thread.java:724)

Что означает это исключение и как его исправить?


person Ris90    schedule 09.04.2014    source источник
comment
что будет на выходе после выполнения команды $KAFKA_ROOT_DIR/bin/kafka-console-consumer.sh --zookeeper xx.xx.xx.xx:2181 --topic _YOUR_TOPIC_ --from-beginning .. вы видите какие-нибудь сообщения?   -  person user2720864    schedule 09.04.2014


Ответы (1)


OffsetOutOfRangeException обычно означает, что клиент запросил диапазон, который больше не доступен на сервере.
Это может произойти, поскольку журнал темы больше не существует в зависимости от политики хранения в вашей настройке Kafka.
если вы используете SimpleConsumer вам нужно будет обработать исключение OffsetOutOfRange в вашем коде. В идеале ваш потребитель должен выдать OffsetRequest, чтобы получить самое последнее / самое раннее смещение, доступное в настоящее время на сервере, а затем использовать его в вашем FetchRequest (в качестве параметра)

person user2720864    schedule 09.04.2014