Я пытаюсь использовать <KStream>.process()
с TimeWindows.of("name", 30000)
, чтобы объединить некоторые значения KTable и отправить их дальше. Кажется, что 30 секунд превышают интервал времени ожидания потребителя, после которого Kafka считает этого потребителя несуществующим и освобождает раздел.
Я попытался увеличить частоту опроса и интервал фиксации, чтобы избежать этого:
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");
К сожалению, эти ошибки все еще происходят:
(много таких)
ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0
Вслед за этим:
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
Очевидно, мне нужно чаще отправлять пульсации обратно на сервер. Как?
Моя топология:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate2", 30000));
DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();
kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
KTable группирует значения по ключу каждые 30 секунд. В Processor.init() я вызываю context.schedule(30000)
.
DBProcessorSupplier предоставляет экземпляр DBProcessor. Это реализация AbstractProcessor, в которой были предоставлены все переопределения. Все, что они делают, это LOG, поэтому я знаю, когда каждый из них попадает под удар.
Это довольно простая топология, но ясно, что я где-то пропустил шаг.
Редактировать:
Я понимаю, что могу настроить это на стороне сервера, но я надеюсь, что есть решение на стороне клиента. Мне нравится, что разделы становятся доступными довольно быстро, когда клиент выходит/умирает.
Редактировать:
В попытке упростить задачу я удалил шаг агрегации из графика. Теперь это просто потребитель-›процессор(). (Если я отправлю потребителя напрямую на .print()
, это сработает v быстро, поэтому я знаю, что все в порядке). (Аналогично, если я вывожу агрегацию (KTable) через .print()
, тоже все в порядке).
Я обнаружил, что .process()
, который должен вызывать .punctuate()
каждые 30 секунд, на самом деле блокируется на переменные промежутки времени и выводит несколько случайным образом (если вообще).
В дальнейшем:
Я установил уровень отладки на «отладка» и перезапустил. Я вижу много сообщений:
DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>
но точка останова в функции .punctuate()
не срабатывает. Так что он делает много работы, но не дает мне возможности его использовать.