Как я могу отправлять большие сообщения с Kafka (более 15 МБ)?

Я отправляю String-сообщения в Kafka V. 0.8 с помощью Java Producer API. Если размер сообщения составляет около 15 МБ, я получаю MessageSizeTooLargeException. Я попытался установить message.max.bytesto 40 МБ, но все равно получаю исключение. Мелкие сообщения работали без проблем.

(Исключение появляется у производителя, у меня нет потребителя в этом приложении.)

Что я могу сделать, чтобы избавиться от этого исключения?

Мой пример конфигурации производителя

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Журнал ошибок:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

person Sonson123    schedule 09.01.2014    source источник
comment
Моим первым побуждением было бы попросить вас разделить это огромное сообщение на несколько более мелких: - / Я предполагаю, что это невозможно по какой-то причине, но вы, тем не менее, можете пересмотреть это: огромные сообщения обычно означают, что есть недостаток дизайна где-то, что действительно нужно исправить.   -  person Aaron Digulla    schedule 09.01.2014
comment
Спасибо, но это значительно усложнило бы мою логику. Почему это плохая идея использовать Kafka для сообщений размером около 15 МБ? Является ли 1 МБ максимально допустимым размером сообщения? Я не так много нашел в документации Kafka об ограничении размера сообщения.   -  person Sonson123    schedule 09.01.2014
comment
Это совершенно не связано с Kafka или какой-либо другой системой обработки сообщений. Мое рассуждение: если что-то пойдет не так с вашим 15-мегабайтным файлом, то потом убрать беспорядок будет очень дорого. Вот почему я обычно разбиваю большие файлы на множество более мелких заданий (которые обычно также могут выполняться параллельно).   -  person Aaron Digulla    schedule 09.01.2014
comment
вы использовали какое-либо сжатие? не могли бы вы поделиться некоторыми подробностями, сложно что-то угадать из одного-единственного слова   -  person user2720864    schedule 10.01.2014
comment
Для тех, кто наткнулся на этот вопрос, но использует librdkafka для связи с Kafka, см. Также: stackoverflow.com/questions/60739858/   -  person Miljen Mikic    schedule 20.10.2020


Ответы (9)


Вам нужно настроить три (или четыре) свойства:

  • Сторона потребителя: fetch.message.max.bytes - определяет максимальный размер сообщения, которое может получить потребитель.
  • Сторона брокера: replica.fetch.max.bytes - это позволит репликам в брокерах отправлять сообщения в кластере и обеспечивать правильную репликацию сообщений. Если это слишком мало, то сообщение никогда не будет реплицировано, и, следовательно, потребитель никогда не увидит сообщение, потому что сообщение никогда не будет зафиксировано (полностью реплицировано).
  • Сторона брокера: message.max.bytes - это наибольший размер сообщения, которое может быть получено брокером от производителя.
  • Сторона брокера (для каждой темы): max.message.bytes - это наибольший размер сообщения, которое брокер разрешает добавлять в тему. Этот размер подтвержден предварительным сжатием. (По умолчанию message.max.bytes брокера.)

Я узнал о номере 2 на собственном горьком опыте - вы не получаете НИКАКИХ исключений, сообщений или предупреждений от Kafka, поэтому обязательно учитывайте это, когда отправляете большие сообщения.

person laughing_man    schedule 24.01.2014
comment
Хорошо, вы и user2720864 были правы. Я только установил message.max.bytes в исходном коде. Но я должен установить эти значения в конфигурации сервера Kafka config/server.properties. Теперь работают и большие сообщения :). - person Sonson123; 03.02.2014
comment
Есть ли какие-либо известные недостатки при установке слишком высоких значений? - person Ivan Balashov; 19.08.2014
comment
да. Со стороны потребителя вы выделяете fetch.message.max.bytes памяти для КАЖДОГО раздела. Это означает, что если вы используете огромное количество для fetch.message.max.bytes в сочетании с большим количеством разделов, это потребует много памяти. Фактически, поскольку процесс репликации между брокерами также является специализированным потребителем, он также потребляет память на брокерах. - person laughing_man; 19.08.2014
comment
Опубликуйте предлагаемые изменения, потребитель не может использовать сообщение stackoverflow.com/questions/32231095/ есть идеи? - person Kedar Parikh; 26.08.2015
comment
fetch.message.max.bytes должен быть добавлен в consumer.properties - person Kedar Parikh; 27.08.2015
comment
Обратите внимание, что существует также max.message.bytes конфигурация для каждой темы, которая может быть ниже, чем message.max.bytes брокера. - person Peter Davis; 20.05.2016
comment
Согласно официальному документу, параметры на стороне потребителя и параметры репликации между брокерами /.*fetch.*bytes/ не кажутся жесткими ограничениями: это не абсолютный максимум, если [...] больше этого значения, пакет записи будет по-прежнему должны быть возвращены, чтобы убедиться, что может быть достигнут прогресс. - person Bluu; 14.12.2018
comment
Начиная с Kafka 0.10.1.0, ограничения на ответ и раздел не должны быть равными или превышающими ограничение на размер сообщения, поскольку логика выборки теперь принимает отдельные сообщения, даже если они превышают лимит размера выборки. . См. kafka.apache.org/documentation/#upgrade_1010_notable. - person Raman; 29.04.2019
comment
Пожалуйста, подумайте о редактировании вашего ответа. Я почти уверен, что fetch.message.max.bytes - это не ограничение на чтение, а скорее конфигурация размера пакета. Сообщения большего размера по-прежнему будут возвращены. - person Gray; 28.08.2019
comment
Привет, вам также нужно установить batch.size и linger.ms на стороне производителя? cloudera.com/documentation/kafka/latest/topics/ - person jack; 28.08.2019

Незначительные изменения необходимы для Kafka 0.10 и нового потребителя по сравнению с ответ смеющегося_мана:

  • Брокер: Без изменений, вам все равно нужно увеличить свойства message.max.bytes и replica.fetch.max.bytes. message.max.bytes должен быть равен или меньше (*) replica.fetch.max.bytes.
  • Производитель: увеличьте max.request.size, чтобы отправить более крупное сообщение.
  • Потребитель: увеличьте max.partition.fetch.bytes, чтобы получать сообщения большего размера.

(*) Прочтите комментарии, чтобы узнать больше о _7 _ ‹= _ 8_

person Sascha Vetter    schedule 18.08.2016
comment
Вы знаете, почему message.max.bytes должен быть меньше replica.fetch.max.bytes? - person Kostas; 23.03.2017
comment
replica.fetch.max.bytes (по умолчанию: 1 МБ) - максимальный размер данных, которые может реплицировать брокер. Он должен быть больше message.max.bytes, иначе брокер примет сообщения и не сможет их реплицировать. Приводит к потенциальной потере данных. Источник: handle-large-messages-kafka. - person Sascha Vetter; 23.03.2017
comment
Спасибо, что связались со мной. Похоже, это перекликается с тем, что предлагает руководство по Cloudera. Однако оба из них неверны - обратите внимание, что они не предлагают никаких технических причин, по которым почему replica.fetch.max.bytes должно быть строго больше, чем message.max.bytes. Сотрудник Confluent - person Kostas; 24.03.2017
comment
Фу. Я только что понял, что ваша ссылка указывает на статью, написанную Гвен, которая также работает на Confluent. Так что у нас, кажется, есть странные разногласия. Я отправлю сообщение в список рассылки kafka-users и посмотрю, что происходит. Я обновлю эту ветку своими выводами. - person Kostas; 24.03.2017
comment
Есть ли обновления относительно message.max.bytes<replica.fetch.max.bytes или message.max.bytes=replica.fetch.max.bytes @Kostas? - person Sascha Vetter; 28.04.2017

Ответ от @laughing_man довольно точен. Но все же я хотел дать рекомендацию, которую я узнал от эксперта Kafka Стефана Маарека. Мы активно применяем это решение в наших живых системах.

Kafka не предназначен для обработки больших сообщений.

Ваш API должен использовать облачное хранилище (Ex AWS S3) и просто отправить в Kafka или любой брокер сообщений ссылку на S3. Вы должны найти место для хранения ваших данных, может быть, это сетевой диск, может быть, что угодно, но это не должен быть брокер сообщений.

Теперь, если вы не хотите использовать вышеуказанное решение

Максимальный размер сообщения - 1 МБ (настройка в ваших брокерах называется message.max.bytes) Apache Kafka. Если вам это действительно нужно, вы можете увеличить этот размер и обязательно увеличить сетевые буферы для ваших производителей и потребителей.

И если вы действительно заботитесь о разделении вашего сообщения, убедитесь, что каждое разделение сообщения имеет один и тот же ключ, чтобы оно было перенесено в один и тот же раздел, а содержимое вашего сообщения должно сообщать «идентификатор части», чтобы ваш потребитель мог полностью восстановить сообщение. .

Вы также можете изучить сжатие, если ваше сообщение основано на тексте (сжатие gzip, snappy, lz4), что может уменьшить размер данных, но не волшебным образом.

Опять же, вам нужно использовать внешнюю систему для хранения этих данных и просто отправить внешнюю ссылку на Kafka. Это очень распространенная архитектура, и вам следует придерживаться ее, и она широко распространена.

Имейте в виду, что Kafka лучше всего работает, только если сообщения огромны по объему, но не по размеру.

Источник: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

person Player_Neo    schedule 15.11.2018
comment
Kafka работает с большими сообщениями, абсолютно без проблем. На начальной странице домашней страницы Kafka он даже упоминается как система хранения. - person calloc_org; 09.07.2020
comment
@Bhanu Hoysala - Я считаю, что большие сообщения следует сохранять в хранилище, а затем отправлять ссылку в сообщении. При этом, как вы гарантируете, что и данные будут записаны, и ссылочное сообщение будет отправлено атомарно? Оба добиваются успеха или ни того, ни другого. - person Jeremy; 02.10.2020
comment
@Jeremy Нам нужен еще один список тем / очередей для изменений, внесенных в ведро (мы можем настроить получение уведомления только для события создания). В случае успеха мы получим сообщение в соответствии с конфигурацией (вы не получаете уведомления о событиях из-за неудачных операций в S3). В случае сбоя служба загрузки файлов будет знать, была ли запись успешной или нет (это синхронная операция). docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo. html В зависимости от сочетания брокера и хранилища могут быть выполнены различные виды интеграции. - person Player_Neo; 03.10.2020

Идея состоит в том, чтобы иметь одинаковый размер сообщения, отправляемого от Kafka Producer в Kafka Broker, а затем полученного Kafka Consumer, т.е.

Производитель Kafka -> Брокер Kafka -> Потребитель Kafka

Предположим, что если требуется отправить 15 МБ сообщения, то нужно синхронизировать Producer, Broker и Consumer. .

Kafka Producer отправляет 15 МБ -> Kafka Broker разрешает / сохраняет 15 МБ -> Kafka Потребитель получает 15 МБ.

Таким образом, настройка должна быть:

а) о Брокере:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

б) на Потребителе:

fetch.message.max.bytes=15728640
person Ravi    schedule 11.09.2015
comment
может быть, fetch.message.max.bytes заменен на max.partition.fetch.bytes в ConsumerConfig? - person s_bei; 17.06.2016

Вам необходимо переопределить следующие свойства:

Конфигурации брокера ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Конфигурации потребителей ($ KAFKA_HOME / config / consumer.properties)
Этот шаг у меня не сработал. Я добавляю его в потребительское приложение, и оно работает нормально

  • fetch.message.max.bytes

Перезагрузите сервер.

дополнительную информацию см. в этой документации: http://kafka.apache.org/08/configuration.html

person user2550587    schedule 17.02.2014
comment
для потребителя командной строки мне нужно использовать флаг --fetch-size = ‹bytes›. Кажется, он не читает файл consumer.properties (kafka 0.8.1). Я бы также порекомендовал включить сжатие со стороны производителя, используя параметр compress.codec. - person Ziggy Eunicien; 09.05.2014
comment
Комментарий Зигги работал у меня kafka 0.8.1.1. Спасибо! - person James; 04.04.2015
comment
может быть, fetch.message.max.bytes заменен на max.partition.fetch.bytes в ConsumerConfig? - person s_bei; 17.06.2016

Важно помнить, что атрибут message.max.bytes должен быть синхронизирован со свойством потребителя fetch.message.max.bytes. размер выборки должен быть не меньше максимального размера сообщения, в противном случае может возникнуть ситуация, когда производители могут отправлять сообщения большего размера, чем потребитель может потреблять / извлекать. Возможно, стоит взглянуть на это.
Какую версию Kafka вы используете? Также предоставьте более подробную информацию о трассировке, которую вы получаете. есть ли что-то вроде ... payload size of xxxx larger than 1000000 в журнале?

person user2720864    schedule 09.01.2014
comment
Я обновил свой вопрос дополнительной информацией: Kafka Version 2.8.0-0.8.0; теперь мне нужен только продюсер. - person Sonson123; 10.01.2014

Для людей, использующих landoop kafka: вы можете передавать значения конфигурации в переменных среды, например:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

И если вы используете rdkafka, то передайте message.max.bytes в конфигурации производителя, например:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Аналогичным образом для потребителя

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      
person informer    schedule 02.06.2020

Я думаю, что большинство ответов здесь устарели или не совсем полны.

Чтобы обратиться к ответу Саши Веттер (с обновлением для Kafka 0.10), я Я хочу предоставить дополнительную информацию и ссылки на официальную документацию.


Конфигурация производителя:

  • max.request.size (Link) необходимо увеличить для файлов размером более 1 МБ, в противном случае они отклоняются

Конфигурация брокера / темы:

  • message.max.bytes (Link) может быть установлен, если вы хотите увеличить сообщение размер на уровне брокера. Но из документации: Это можно установить для каждой темы с конфигурацией max.message.bytes на уровне темы.
  • max.message.bytes (Link) можно увеличить, если только одна тема должна поддерживать принимать лагерные файлы. Конфигурацию брокера изменять нельзя.

Я всегда предпочитаю конфигурацию с ограничением темы, потому что я могу настроить тему самостоятельно как клиент для кластера Kafka (например, с помощью клиент администратора). Я не могу иметь никакого влияния на саму конфигурацию брокера.


В ответах выше при необходимости упоминаются еще несколько конфигураций:

Из документации: Это не абсолютный максимум, если первый пакет записей в первом непустом разделе выборки больше этого значения, пакет записей все равно будет возвращен, чтобы гарантировать, что прогресс может быть достигнут.

Из документации: Записи загружаются заказчиком партиями. Если первый пакет записей в первом непустом разделе выборки превышает этот предел, пакет все равно будет возвращен, чтобы гарантировать, что потребитель может продолжить работу.

  • fetch.max.bytes (Link) (Consumer config; не упоминается выше, но из той же категории )

Из документации: Записи выбираются в пакетах потребителем, и если первый пакет записей в первом непустом разделе выборки превышает это значение, пакет записей все равно будет возвращен, чтобы гарантировать, что потребитель может добиться прогресса.


Заключение. Не нужно изменять конфигурации получения сообщений для обработки сообщений, больше, чем значения по умолчанию для этой конфигурации (если это было протестировано в небольшой установке). Вероятно, потребитель всегда может получить пакеты размером 1. Однако необходимо установить две конфигурации из первого блока, как упоминалось в ответах ранее.

Это пояснение не должно ничего рассказывать о производительности и не должно быть рекомендацией устанавливать или не устанавливать эту конфигурацию. Лучшие значения необходимо оценивать индивидуально в зависимости от конкретной запланированной пропускной способности и структуры данных.

person MichaelCkr    schedule 27.05.2021

Вот как я успешно отправил данные размером до 100 МБ с помощью kafka-python==2.0.2:

Маклер:

consumer = KafkaConsumer(
    ...
    max_partition_fetch_bytes=max_bytes,
    fetch_max_bytes=max_bytes,         
)

Производитель (см. Окончательное решение в конце):

producer = KafkaProducer(
    ...
    max_request_size=KafkaSettings.MAX_BYTES,
)

Потом:

producer.send(topic, value=data).get()

После отправки таких данных появилось следующее исключение:

MessageSizeTooLargeError: The message is n bytes when serialized which is larger than the total memory buffer you have configured with the buffer_memory configuration.

Наконец, я увеличил buffer_memory (по умолчанию 32 МБ), чтобы получать сообщение на другом конце.

producer = KafkaProducer(
    ...
    max_request_size=KafkaSettings.MAX_BYTES,
    buffer_memory=KafkaSettings.MAX_BYTES * 3,
)
person Tobias Ernst    schedule 09.07.2021