Выборы лидера Kafka привели к краху Kafka Streams

У меня есть приложение Kafka Streams, которое потребляет и производит кластер Kafka с 3 брокерами и коэффициентом репликации 3. За исключением тем смещения потребителя (50 разделов), все остальные темы имеют только по одному разделу.

Когда брокеры пытаются выбрать предпочтительную реплику, приложение Streams (которое работает на совершенно другом экземпляре, чем брокеры) выдает ошибку:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ...
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Нормально ли, что приложение Streams пытается быть лидером для раздела, учитывая, что оно работает на сервере, который не является частью кластера Kafka?

Я могу воспроизвести это поведение по запросу:

  1. Убийство одного из брокеров (после чего два других берут на себя роль лидера для всех разделов, в которых, как и ожидалось, был убитый брокер)
  2. Возвращение убитого брокера обратно
  3. Запуск предпочтительного выбора лидера реплики с помощью bin/kafka-preferred-replica-election.sh --zookeeper localhost

Моя проблема похожа на эту сообщение об ошибке, поэтому я Мне интересно, это новая ошибка Kafka Streams. Моя полная трассировка стека буквально совпадает с сутью сообщения об ошибке (здесь < / а>).

Еще одна потенциально интересная деталь заключается в том, что во время выборов лидера я получаю эти сообщения в controller.log брокера:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Первоначально я думал, что виновата эта ошибка подключения, но после того, как выборы лидера приводят к сбою приложения Streams, если я перезапускаю приложение Streams, оно работает нормально до следующих выборов, и я вообще не касаюсь брокеров.

Все серверы (3 брокера Kafka и приложение Streams) работают на экземплярах EC2.


person Bogdan    schedule 12.04.2017    source источник


Ответы (1)


Теперь это исправлено в 0.10.2.1. Если вы не можете это подобрать, убедитесь, что эти два параметра установлены следующим образом в конфигурации ваших потоков:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
person Eno Thereska    schedule 12.04.2017
comment
Эти два параметра вместе с props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); решили проблему! Спасибо! - person Bogdan; 13.04.2017