Kafka KStreams — тайм-ауты обработки

Я пытаюсь использовать <KStream>.process() с TimeWindows.of("name", 30000), чтобы объединить некоторые значения KTable и отправить их дальше. Кажется, что 30 секунд превышают интервал времени ожидания потребителя, после которого Kafka считает этого потребителя несуществующим и освобождает раздел.

Я попытался увеличить частоту опроса и интервал фиксации, чтобы избежать этого:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

К сожалению, эти ошибки все еще происходят:

(много таких)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

Вслед за этим:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

Очевидно, мне нужно чаще отправлять пульсации обратно на сервер. Как?

Моя топология:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

KTable группирует значения по ключу каждые 30 секунд. В Processor.init() я вызываю context.schedule(30000).

DBProcessorSupplier предоставляет экземпляр DBProcessor. Это реализация AbstractProcessor, в которой были предоставлены все переопределения. Все, что они делают, это LOG, поэтому я знаю, когда каждый из них попадает под удар.

Это довольно простая топология, но ясно, что я где-то пропустил шаг.


Редактировать:

Я понимаю, что могу настроить это на стороне сервера, но я надеюсь, что есть решение на стороне клиента. Мне нравится, что разделы становятся доступными довольно быстро, когда клиент выходит/умирает.


Редактировать:

В попытке упростить задачу я удалил шаг агрегации из графика. Теперь это просто потребитель-›процессор(). (Если я отправлю потребителя напрямую на .print(), это сработает v быстро, поэтому я знаю, что все в порядке). (Аналогично, если я вывожу агрегацию (KTable) через .print(), тоже все в порядке).

Я обнаружил, что .process(), который должен вызывать .punctuate() каждые 30 секунд, на самом деле блокируется на переменные промежутки времени и выводит несколько случайным образом (если вообще).

В дальнейшем:

Я установил уровень отладки на «отладка» и перезапустил. Я вижу много сообщений:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

но точка останова в функции .punctuate() не срабатывает. Так что он делает много работы, но не дает мне возможности его использовать.


person ethrbunny    schedule 30.08.2016    source источник


Ответы (1)


Несколько уточнений:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG — нижняя граница интервала фиксации, т. е. после фиксации следующая фиксация происходит не раньше, чем пройдет это время. По сути, Kafka Stream пытается зафиксировать как можно скорее по прошествии этого времени, но нет никакой гарантии, сколько времени на самом деле потребуется для выполнения следующей фиксации.
  • StreamsConfig.POLL_MS_CONFIG используется для внутреннего вызова KafkaConsumer#poll(), чтобы указать максимальное время блокировки вызова poll().

Таким образом, оба значения не способствуют более частому сердцебиению.

Kafka Streams следует стратегии «сначала в глубину» при обработке записи. Это означает, что после poll() для каждой записи выполняются все операторы топологии. Предположим, у вас есть три последовательные карты, тогда все три карты будут вызываться для первой записи, прежде чем будет обработана следующая/вторая запись.

Таким образом, следующий poll() вызов будет сделан после того, как вся запись первого poll() будет полностью обработана. Если вы хотите, чтобы пульс выполнялся чаще, вам нужно убедиться, что один вызов poll() извлекает меньше записей, чтобы обработка всех записей занимала меньше времени, а следующий poll() запускался раньше.

Вы можете использовать параметры конфигурации для KafkaConsumer, которые вы можете указать через StreamsConfig, чтобы сделать это (см. https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX, ЗНАЧЕНИЕ);

  • max.poll.records: если вы уменьшите это значение, будет опрошено меньше записей
  • session.timeout.ms: если вы увеличите это значение, у вас будет больше времени для обработки данных (добавьте это для полноты картины, потому что на самом деле это настройка клиента, а не конфигурация на стороне сервера/брокера, даже если вы знаете об этом решении и оно вам не нравится). :))

ИЗМЕНИТЬ

Начиная с Kafka 0.10.1 возможно (и рекомендуется) добавлять префикс к конфигам потребителей и производителей в конфигах потоков. Это позволяет избежать конфликтов параметров, поскольку некоторые имена параметров используются для потребителя и производителя и не могут быть различимы в противном случае (и будут применяться к потребителю и производителю одновременно). Для префикса параметра вы можете использовать StreamsConfig#consumerPrefix() или StreamsConfig#producerPrefix() соответственно. Например: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

Еще одно добавление: сценарий, описанный в этом вопросе, является известной проблемой, и уже существует KIP-62, который вводит фоновый поток для KafkaConsumer, который отправляет такты, тем самым отделяя Heartbeat от вызовов poll(). Kafka Streams будет использовать эту новую функцию в следующих выпусках.

person Matthias J. Sax    schedule 30.08.2016
comment
Я добавил записи для config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000"); и одну для REQUEST_TIMEOUT_MS_CONFIG, 120000. Клиент просидел › 5 минут, прежде чем был вызван .process().init(). Никакой другой деятельности. - person ethrbunny; 31.08.2016
comment
(продолжение) удалены записи выше и добавлены config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");. Никаких изменений в поведении. Клиент/потребитель по-прежнему ничего не делает. НО если я изменю параметр Name в параметре Tumbling Windows, который я передам .aggregateByKey(), я получу действие, а затем сразу же снова начну получать ошибки тайм-аута. Понятно, что я не использую это правильно. - person ethrbunny; 31.08.2016
comment
Можете ли вы поделиться своим кодом DBAggregateInit, DBAggregate, DBProcessorSupplier и DBProcessor? - person Matthias J. Sax; 31.08.2016
comment
Они заглушены. Каждая функция возвращает значение по умолчанию и регистрируется в консоли, поэтому я знаю, когда она вызывается. DBAggregateInit возвращает значение null. ProcessorSupplier.get() возвращает new BatchProcessor();. DBAggregator.apply() регистрирует значения, отправленные в (s, s2 и s3) для отладки. Вот и все. - person ethrbunny; 31.08.2016
comment
Результат DBAggregate#apply() будет записан в журнал изменений. Что он возвращает? А что такое BatchProcessor? - person Matthias J. Sax; 31.08.2016
comment
См. встроенные правки. Я попытался упростить вопрос. На данный момент я не сталкиваюсь с ошибками тайм-аута, поэтому, возможно, пришло время разрешить эту запись SO и начать новую. - person ethrbunny; 31.08.2016
comment
Я отмечаю это как решенное. Тайм-ауты можно уменьшить с помощью вышеуказанных настроек. Мои проблемы с ProcessorContext и AbstractProcessor находятся здесь. - person ethrbunny; 31.08.2016