Kafka не потребляет все произведенные данные

У меня есть один экземпляр kafka, установленный на виртуальной машине с 8 ядрами и 32 ГБ ОЗУ.

Я пишу в него (производлю) с 10 разных машин и потребляю с одной машины, которые все в одной сети.

Размер данных, которые я выдаю, составляет ~35МБит/с.

По какой-то причине большую часть времени я не могу потреблять более ~ 10 МБит / с (в течение ограниченного периода времени мне удается потреблять все произведенные данные), даже несмотря на то, что кафка И серверы-потребители в основном простаивают (поэтому я не думаю, что это проблема удержания).

Может ли kafka игнорировать некоторые из полученных данных?

Некоторые значения параметров, которые могут быть полезны для анализа:

num.network.threads=32
num.io.threads=16
message.max.bytes=2147483647
num.partitions=10
log.retention.ms=120000 (2 minutes)

person KidCrippler    schedule 28.05.2017    source источник
comment
У вас есть acks=1 или все для вашего производителя?   -  person dawsaw    schedule 28.05.2017
comment
@dawsaw Не уверен, это свойство на стороне клиента? Не могли бы вы уточнить?   -  person KidCrippler    schedule 28.05.2017
comment
Да, это на клиенте производителя. Вы должны иметь acks=all, чтобы гарантировать, что брокер увидел сообщение. acks=1 гарантирует, что его увидел один брокер, что, вероятно, является лучшим, что вы можете сделать в вашем случае. Также код для производителя должен обрабатывать случаи, когда подтверждение не получено, чтобы предупредить вас об отсутствующих данных и/или повторить попытку отправки.   -  person dawsaw    schedule 28.05.2017
comment
@dawsaw Я только что проверил, он установлен на «все»   -  person KidCrippler    schedule 28.05.2017
comment
Итак, одна вещь, которую может быть полезно опубликовать, — это логика для производителя вокруг вызова send(). Что-то должно быть сделано с Future, предоставленным сообщением об отправке, иначе сообщения могут быть просто проигнорированы. Также было бы неплохо узнать, какая версия Kafka используется на сервере и на стороне клиента.   -  person dawsaw    schedule 28.05.2017
comment
@dawsaw По сути, логика вызова send() гарантирует, что если обратный вызов onCompletion возвращается с ненулевым исключением, он либо повторяет попытку, либо приводит к сбою потока (в зависимости от типа исключения). Излишне говорить, что поток никогда не падает. Что касается версий, клиент 0.10.2.1 и сервер 0.10.1.0.   -  person KidCrippler    schedule 28.05.2017
comment
Итак, если брокер подтверждает ответ, то идею удержания все же следует изучить с 2-минутным временем удержания. Если вы увеличите это значение, вы получите больше надежности? Есть ли у вас какие-либо переопределения конфигурации по этой теме?   -  person dawsaw    schedule 28.05.2017


Ответы (1)


Ваше время удержания слишком низкое. Если ваш потребитель отстает от любого из 10 производителей более чем на 2 минуты, сообщения будут потеряны. Попробуйте 24 часа или, по крайней мере, столько, сколько у вас есть дискового пространства для заполнения. Срок хранения по умолчанию составляет 7 дней. Хранение сообщений в течение более длительного периода также поможет вам отладить, если все они успешно попадают в тему.

person Hans Jespersen    schedule 29.05.2017
comment
Я пытался увеличить время хранения до 1 часа (~ 16 ГБ данных) безрезультатно. По прошествии часа Кафка начал бороться. То же самое произошло, когда я попробовал это раньше с 10-минутным удержанием — оно работало в течение 10 минут, а затем начало давать сбои. - person KidCrippler; 29.05.2017
comment
Похоже, ваше узкое место появляется только тогда, когда Kafka удаляет сообщения с истекшим сроком действия. Является ли log.cleanup.policy в теме, которую вы используете, компактной или удаленной? Какое хранилище вы используете? Не превышаете ли вы емкость дискового ввода-вывода при одновременном удалении сегментов журнала и записи новых? Ваши ставки не очень высоки, но если диск очень медленный, это может вызвать эту проблему. - person Hans Jespersen; 29.05.2017