Мы испытали следующий сценарий:
- У нас есть кластер Kafka, состоящий из 3 узлов, каждая созданная тема имеет 3 раздела.
- Сообщение отправляется через
MessageChannel.send()
, создавая запись, скажем, для раздела 1. - Брокер, действующий в качестве лидера раздела для этого раздела, не работает
По умолчанию MessageChannel.send()
возвращает true
и не генерирует никаких исключений, даже если в конечном итоге KafkaProducer не может успешно отправить сообщение. Примерно через 30 секунд после этого вызова мы наблюдаем в журналах следующее сообщение: Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time
В нашем случае это неприемлемо, поскольку мы должны быть уверены, что все сообщения в конечном итоге будут доставлены в Kafka в момент возврата вызова MessageChannel.send()
.
Мы включили spring.cloud.stream.kafka.bindings.<channelName>.producer.sync
в true
, что соответствует описанию в документации. Он блокирует вызывающую сторону для подтверждения производителем успеха или неудачи доставки (MessageTimeoutException
, InterruptedException
, ExecutionException
), все это контролируется KafkaProducerMessageHandler
. Для нас это лучший подход, поскольку в нашем случае влияние на производительность незначительно.
Но нужно ли нам заботиться о повторной попытке, если возникает исключение? (например, в нашем клиентском коде с @Retryable
)
Вот простой проект для эксперимента: https://github.com/phdezann/spring-cloud-bus-kafka-helloworld