Как восстановить из исключений, отправленных производителем.send () в Spring Cloud Stream

Мы испытали следующий сценарий:

  • У нас есть кластер Kafka, состоящий из 3 узлов, каждая созданная тема имеет 3 раздела.
  • Сообщение отправляется через MessageChannel.send(), создавая запись, скажем, для раздела 1.
  • Брокер, действующий в качестве лидера раздела для этого раздела, не работает

По умолчанию MessageChannel.send() возвращает true и не генерирует никаких исключений, даже если в конечном итоге KafkaProducer не может успешно отправить сообщение. Примерно через 30 секунд после этого вызова мы наблюдаем в журналах следующее сообщение: Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time

В нашем случае это неприемлемо, поскольку мы должны быть уверены, что все сообщения в конечном итоге будут доставлены в Kafka в момент возврата вызова MessageChannel.send().

Мы включили spring.cloud.stream.kafka.bindings.<channelName>.producer.sync в true, что соответствует описанию в документации. Он блокирует вызывающую сторону для подтверждения производителем успеха или неудачи доставки (MessageTimeoutException, InterruptedException, ExecutionException), все это контролируется KafkaProducerMessageHandler. Для нас это лучший подход, поскольку в нашем случае влияние на производительность незначительно.

Но нужно ли нам заботиться о повторной попытке, если возникает исключение? (например, в нашем клиентском коде с @Retryable)

Вот простой проект для эксперимента: https://github.com/phdezann/spring-cloud-bus-kafka-helloworld


person phdezann    schedule 22.12.2017    source источник


Ответы (1)


Если send() выполняется в потоке @StreamListener и исключение отбрасывается обратно в связыватель, конфигурация повтора связывания выполнит повторные попытки.

Однако, поскольку вы выполняете отправку в потоке HTTP, вам нужно будет выполнить собственную повторную попытку (вызвать отправку в рамках RetryTemplate()) или создать метод контроллера @Retryable.

person Gary Russell    schedule 22.12.2017