Потоки Apache Kafka: сообщения о нарушении порядка

У меня есть производитель Apache Kafka 2.6, который пишет в тему A (TA). У меня также есть приложение потоковой передачи Kafka, которое потребляет из TA и записывает в тему-B (TB). В приложении потоков у меня есть настраиваемый экстрактор отметок времени, который извлекает отметку времени из полезной нагрузки сообщения.

Для одного из моих тестовых примеров обработки сбоев я выключил кластер Kafka во время работы моих приложений.

Когда приложение-производитель пытается записать сообщения в TA, оно не может, потому что кластер не работает и, следовательно, (я предполагаю) буферизует сообщения. Допустим, он получает 4 сообщения m1, m2, m3, m4 в возрастающем временном порядке. (т.е. m1 - первое, а m4 - последнее).

Когда я возвращаю кластер Kafka в оперативный режим, производитель отправляет в тему буферизованные сообщения, но они не в порядке. Я получаю, например, m2, затем m3, затем m1, а затем m4.

Это почему ? Это потому, что буферизация в производителе является многопоточной, и каждый из них работает в теме одновременно?

Я предположил, что пользовательский экстрактор временных меток поможет упорядочить сообщения при их использовании. Но они этого не делают. Или, может быть, я неправильно понимаю экстрактор временных меток.

У меня есть одно решение от SO здесь, чтобы просто передать все события из tA в другую промежуточную тему (скажем, tA '), которая будет использовать экстрактор TimeStamp в другую тему. Но я не уверен, что это приведет к переупорядочению событий на основе извлеченной отметки времени.

Мой код для продюсера показан ниже (я использую Spring Cloud для создания продюсера): Producer.java

@Service
public class Producer {

    private String topicName = "input-topic";
        
    private ApplicationProperties appProps;
    
    @Autowired
    private KafkaTemplate<String, MyEvent> kafkaTemplate;
    
    public Producer() {
        super();        
    }
    
    @Autowired
    public void setAppProps(ApplicationProperties appProps) {
        this.appProps = appProps;
        this.topicName = appProps.getInput().getTopicName();
    }

    public void sendMessage(String key, MyEvent ce) {
        ListenableFuture<SendResult<String,MyEvent>> future = this.kafkaTemplate.send(this.topicName, key, ce); 
        
    }
}

person Neeraj    schedule 19.04.2021    source источник


Ответы (1)


Это почему ? Это потому, что буферизация в производителе является многопоточной, и каждый из них работает в теме одновременно?

По умолчанию производитель разрешает до 5 параллельных оперативных запросов к брокеру, и поэтому, если некоторые запросы терпят неудачу и повторяются, порядок запросов может измениться.

Чтобы избежать этой проблемы с переупорядочением, вы можете установить max.in.flight.requests.per.connection = 1 (что может снизить производительность) или установить enable.idempotence = true.

Кстати: вы не сказали, есть ли у вашей темы один или несколько разделов, и есть ли в ваших сообщениях ключ? Если ваша тема имеет более одного раздела, и ваши сообщения отправляются в разные разделы, в любом случае нет гарантии упорядочивания при чтении, потому что упорядочение смещения гарантируется только внутри раздела.

Я предположил, что пользовательский экстрактор временных меток поможет упорядочить сообщения при их использовании. Но они этого не делают. Или, может быть, я неправильно понимаю экстрактор временных меток.

Средство извлечения метки времени извлекает только метку времени. Kafka Streams не меняет порядок сообщений, но всегда обрабатывает сообщения в порядке смещения.

Если нет, то каковы конкретные применения экстрактора отметок времени? Просто чтобы связать метку времени с событием?

Правильный.

У меня есть одно решение от SO здесь, чтобы просто передать все события из tA в другую промежуточную тему (скажем, tA '), которая будет использовать экстрактор TimeStamp для другой темы. Но я не уверен, что это приведет к переупорядочению событий на основе извлеченной отметки времени.

Нет, переупорядочивания не будет. Другой вопрос SO как раз собирается изменить метку времени, но если вы читаете сообщения в порядке a, b, c, результат будет записан в порядке a, b, c (только с разными метками времени, но порядок смещения должен быть сохранен).

В этом выступлении объясняются некоторые дополнительные сведения: https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/

person Matthias J. Sax    schedule 21.04.2021
comment
На ваш вопрос о том, используем ли мы ключи, да, мы используем. Я пробовал с max.in.flight.requests.per.connection = 1, и порядок сохраняется во время повторных попыток. Итак, enable.idempotence=true также снизится производительность при сохранении порядка сообщений? - person Neeraj; 21.04.2021
comment
enable.idempotence=true также может иметь перфоманс, но он может быть меньше, чем max.in.flight.request.per.connection=1 - кроме того, если вы включите идемпотентную запись, вы также защитите повторные дубликаты в случае повторных попыток (в противном случае повторная попытка может привести к добавлению дубликатов в тему, потому что запись могла быть успешной, но при повторной попытке теряется только подтверждение). - person Matthias J. Sax; 21.04.2021
comment
Также в моей теме 2 раздела. Если у моих сообщений есть ключи, то почему Kafka не поддерживает порядок (даже если они повторяются)? Я думал, что одним из преимуществ наличия ключей сообщений было обеспечение того, чтобы все они попадали в один и тот же раздел, а также упорядочивались внутри этого раздела. Есть ли справочные документы, которые могут показать мне, как работает механизм повтора? - person Neeraj; 22.04.2021
comment
Если в вашем сообщении такой же ключ, все в порядке. Но в своем вопросе вы ничего не сказали о ключе, поэтому было неясно, может ли у них вообще быть ключ или может быть другой или тот же ключ. - person Matthias J. Sax; 22.04.2021
comment
Are there any reference docs which can show me how the retry mechanism works? - Не совсем, но все просто: производитель буферизует записи пакетами и отправляет запросы на запись пакетами. Таким образом, исходные запросы на запись отправляются по порядку. Однако, если возникает ошибка, они отправляют запрос и снова помещаются в очередь отправки FIFO в конце и, таким образом, могут впоследствии нарушить порядок: send (1), send (2), send (3) - send (1) терпит неудачу и в конце снова ставится в очередь. Не порядок отправки (если отказов больше нет) - 2/3/1. - person Matthias J. Sax; 22.04.2021
comment
Спасибо @Mathias J. Sax - person Neeraj; 22.04.2021
comment
Ваше объяснение очень интуитивно понятно. Я использую ключи, но повторяющиеся сообщения все равно не в порядке. Так что не уверен, почему это так. В моем случае, если я установил max.in.flight.requests.per.connection=1, что изменится по w.r.t. отправить (1), отправить (2), отправить (3). Означает ли это, что производитель будет продолжать попытки 1, пока не добьется успеха. И ТОЛЬКО затем попробуйте 2 и, наконец, 3. Почему в этом случае порядок сохраняется? - person Neeraj; 22.04.2021
comment
Правильный. С max.in.flight=1 длина очереди запросов установлена ​​на 1, и, таким образом, send(1) повторяется до успешного завершения, прежде чем send (2) будет помещен в очередь. - person Matthias J. Sax; 22.04.2021
comment
Я включил идемпотентность в значение true, но даже тогда сообщения в некоторых случаях генерируются вне последовательности, а в некоторых - в последовательности. Не уверен, почему это такое непоследовательное поведение. Что я могу сделать, чтобы понять эту проблему? (Кстати, я использую ключи сообщений, поэтому все сообщения имеют один и тот же ключ, но сообщения не идут последовательно, когда возникает проблема) - person Neeraj; 10.05.2021
comment
Трудно сказать - если только брокер выходит из строя, а потребитель остается, повторного заказа не должно происходить. Какая у вас конфигурация темы: коэффициент репликации должен быть 3, а min-in-sync-replicas должен быть 2. - Можете ли вы воспроизвести проблему также без идемпотентной записи, но max.in.flight=1? - person Matthias J. Sax; 10.05.2021
comment
@Mathias J. Sax Я вижу, что min.insync.replicas по умолчанию равен 1. Я также попробую установить его на 2. Коэффициент репликации нашей темы равен 3. Я также попробую с max.in.flight = 1 . (и измените идемпотентность на ложную) - person Neeraj; 11.05.2021
comment
Я изменил min.insync.replicas на 2, и это вызывает следующую ошибку после того, как производитель (повторно) пытается опубликовать сообщение после запуска кластера Kafka. NOT_ENOUGH_REPLICAS - person Neeraj; 11.05.2021
comment
Это может произойти во время запуска: если подключен только один брокер, он еще не может принять запись. Однако производитель должен повторить внутреннюю попытку, но не должен потерпеть неудачу (если настроен правильно). - person Matthias J. Sax; 11.05.2021
comment
Я проверил, что внутренние темы, такие как __consumer_offsets и темы состояния транзакции, были настроены с коэффициентом репликации по умолчанию 1 и, следовательно, с указанной выше ошибкой NOT_ENOUGH_REPLICAS. Я изменил коэффициент репликации внутренней темы на 3 и теперь таких ошибок нет. Теперь я попробую с этой конфигурацией и посмотрю, есть ли проблемы с порядком для моего сценария отказа. Вопрос: Почему min-insync-replicas должно быть больше 1, чтобы достичь идемпотентности? Разве брокер автоматически не устанавливает это значение на 1 меньше, чем коэффициент репликации, когда мы включаем идемпотентность? - person Neeraj; 11.05.2021
comment
В общем, чтобы убедиться, что у вас есть отказоустойчивая конфигурация, вам нужно установить min-insync больше единицы - в противном случае нет гарантии, и вы все равно можете потерять данные. - Порядковые номера, которые использует идемпотентный производитель, хранятся в темах, и поэтому, если вы потеряете данные и порядковые номера, идемпотентная запись не может быть гарантирована. - А нет, брокер не будет перенастраивать тему. Вы несете ответственность за правильную настройку тем. - person Matthias J. Sax; 11.05.2021
comment
Я пробовал использовать обе конфигурации, но производитель по-прежнему публикует сообщения вне очереди. Я пробовал это даже с одним кластером брокера с одним разделом и репликой на тему. Все, что я делаю, это использую producer.send для асинхронной отправки запроса. Не знаю, почему это происходит. Меня не волнует, какой результат я получу от асинхронной отправки (это всего лишь тестовый код). Я обновил вопрос своим образцом кода. - person Neeraj; 12.05.2021
comment
У меня заканчиваются идеи. Возможно, стоит отправить отчет об ошибке. - person Matthias J. Sax; 12.05.2021
comment
Поднят KAFKA-12776 (issues.apache.org/jira/browse/ КАФКА-12776? Filter = -2) - person Neeraj; 12.05.2021
comment
Кроме того, почему KafkaProducer.send блокирует max.block.ms, когда кластер недоступен? Я предположил, что это был асинхронный вызов, который немедленно вернется. - person Neeraj; 13.05.2021
comment
Как правило, он возвращается немедленно: внутри производителя есть буфер фиксированного размера для хранения записей, и send() поместит записи в этот буфер и может вернуться после этого (фактическая отправка происходит, когда фоновый поток извлекает запись из этого буфера) . Если брокеры недоступны, этот буфер будет заполнен, и при вызове send() новая запись не может быть помещена в буфер, потому что он заполнен и, следовательно, send() блоки вызовов для этого случая. - person Matthias J. Sax; 14.05.2021