Получение только одной записи из многих, когда потребитель kafka впервые получает запись?

Я использую spring-kafka и spring-kafka-test версии 1.0.2.RELEASE.

В одном из моих тестов мое приложение отправляет 100 записей подряд в один TopicPartion на экземпляре EmbeddedKafka с использованием KafkaTemplate и в основном с настройками конфигурации по умолчанию.

Я использую метод KafkaTestUtils.getRecords (потребитель), чтобы попытаться получить записи из экземпляра Kafka и убедиться, что все они были отправлены.

В первый раз, когда я вызываю getRecords, я получаю только одну запись. Если я позвоню еще раз, я получу остальные 99.

Если я явно устанавливаю позицию потребителя в начало раздела TopicPartition, а затем вызываю getRecords, я получаю все 100.

Почему getRecords получает только одну запись в первый раз? Есть ли лучший способ получить все 100 сразу, явно вызвав seekToBeginning на потребителе?


person Joseph Downing    schedule 11.08.2016    source источник


Ответы (2)


Звучит как проблема со сроками. Вполне возможно, что при первом вызове poll() было доступно только одно сообщение - этот метод не дает никаких гарантий относительно того, сколько сообщений будет получено. При написании кода не следует предполагать, что вы сразу получите X записей. В Kafka 0.10 max.poll.records есть потребительское свойство, которое для целей тестирования может потребоваться установить в 1, а затем выполнять цикл приема, пока не будут опрошены все 100.

person Jakub Korab    schedule 13.08.2016

Скорее всего, это просто состояние гонки - потребитель сидит в poll(), и брокер отправляет первое сообщение, как только оно приходит.

См. Свойства fetch.min.bytes и fetch.max.wait.ms в документации kafka.

fetch.min.bytes по умолчанию равен 1.

ИЗМЕНИТЬ

Вы также можете попробовать flush() использовать KafkaTemplate перед вызовом getRecords().

Однако ваш тест не должен на самом деле полагаться на получение всех сообщений за одну выборку - слишком хрупкий.

person Gary Russell    schedule 11.08.2016
comment
То же самое происходит, если я сбрасываю KafkaTemplate и жду 5 секунд перед вызовом getRecords(). Увеличение значения fetch.min.bytes does увеличивает общее количество записей, возвращаемых первым вызовом, до getRecords. Я ожидаю, что 5 секунд будет достаточно времени для того, чтобы все сообщения добрались до брокера. Следующие ~ 80 сообщений доступны для следующего getRecords вызова, следующего сразу за первым. Есть ли что-нибудь еще, что можно сделать, чтобы гарантировать наличие всех сообщений до того, как потребитель прочитает или что потребитель прочитает все доступные сообщения? - person Joseph Downing; 11.08.2016
comment
Я немного удивлен, что flush() не помогло, но, как я сказал в своем последнем редактировании - ваш тест будет хрупким, если вы будете полагаться только на время, поэтому достаточно большой fetch.min.bytes с большим fetch.max.wait.ms, вероятно, единственный способ получить надежный тест. Это продлит ваш тест (если вы не сможете точно определить, сколько байтов 100 сообщений - с накладными расходами - в байтах), но он будет надежным (до тех пор, пока kafka не изменит размер накладных расходов :)). - person Gary Russell; 11.08.2016
comment
Да, я не хочу оставлять это ожидание там. Я просто пытался проверить, была ли проблема в том, отправлялись ли сообщения на Кафку или нет. Похоже, что при первом вызове потребителя все сообщения уже должны быть там. Но в первый раз кажется, что потребитель читает достаточно сообщений только для удовлетворения значения fetch.min.bytes, а затем при втором вызове он прочитает столько других сообщений, сколько доступно, независимо от размера. Почему первый вызов зависит от значения fetch.min.bytes, а второй - нет? - person Joseph Downing; 11.08.2016
comment
Не знаю - это вопрос к кафкам. Наверное, просто так работают их внутренние алгоритмы. - person Gary Russell; 11.08.2016