Я использую Кафку 0.10.1.0.
Это мой продюсер:
val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
Но указанный выше обратный вызов не может обработать java.net.ConnectException: Connection refused
в случае, если сервер kafka не работает.
UPD
ConnectionException
поднимается в другом потоке (в класс Sender
, который используется в KafkaProducer
). Поэтому мы не можем использовать для этого try {} catch
. Также мне не нужен механизм повтора, мне нужен способ справиться с этой ситуацией (например, если Kafka не работает, а производитель не может отправить сообщение, я собираюсь использовать какой-то другой Queue API).
Есть ли способ обработки этого исключения?