Производитель Kafka: как обрабатывать java.net.ConnectException: соединение отклонено

Я использую Кафку 0.10.1.0.

Это мой продюсер:

val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
   override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)

Но указанный выше обратный вызов не может обработать java.net.ConnectException: Connection refused в случае, если сервер kafka не работает.

UPD

ConnectionException поднимается в другом потоке (в класс Sender, который используется в KafkaProducer). Поэтому мы не можем использовать для этого try {} catch. Также мне не нужен механизм повтора, мне нужен способ справиться с этой ситуацией (например, если Kafka не работает, а производитель не может отправить сообщение, я собираюсь использовать какой-то другой Queue API).

Есть ли способ обработки этого исключения?


person John Mullins    schedule 29.11.2016    source источник


Ответы (1)


У вас есть несколько вариантов. Scala предоставляет способ перехвата исключений, который принимает следующую форму:

   try { 
   // ... 
   } 
   catch {
     case ioe: IOException => ... // more specific cases first !
     case e: Exception => ...
   }

Итак, самый простой подход:

  try { 
     producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
   } 
   catch {
     case ce: ConnectionException => // handle exception
   }

Более сложным, но более надежным был бы механизм повторных попыток:

Как реализовать Scala? повторный вызов, подобный этому?

Также обратите внимание, что в Kafka Producer встроен механизм повторных попыток, который также может оказаться полезным:

Установка значения больше нуля приведет к повторной отправке клиентом любой записи, отправка которой завершилась сбоем с потенциально временной ошибкой. Обратите внимание, что эта повторная попытка ничем не отличается от повторной отправки записи клиентом после получения ошибки. Разрешение повторных попыток без установки для max.in.flight.requests.per.connection значения 1 потенциально изменит порядок записей, потому что если два пакета отправляются в один раздел, и первый терпит неудачу и повторяется, но второй завершается успешно, то записи во второй партии может появиться первая.

person crypto    schedule 30.11.2016
comment
Привет @crypto, спасибо за ответ. Но этот способ не будет работать, потому что ConnectionException поднимается в другом потоке (в класс Sender, который используется в KafkaProducer). Поэтому мы не можем использовать для этого try {} catch. - person John Mullins; 30.11.2016
comment
да, это отдельный поток, созданный внутри метода producer.send - person John Mullins; 05.12.2016
comment
Итак, исключение выдается до того, как будет вызван ваш обратный вызов? - person crypto; 05.12.2016
comment
Когда возникает исключение, обратный вызов не вызывается. И это исключение появляется только в журналах. И мы не можем его поймать. - person John Mullins; 06.12.2016
comment
Не могу воспроизвести ошибку. metadata.fetch.timeout.ms = 60000 происходит до того, как я получаю исключение ConnectionException. Я попробовал неверную комбинацию хост: порт, и время выборки метаданных истекло, прежде чем я увидел какое-либо ConnectionException - person crypto; 06.12.2016