Я рассмотрел этот вопрос, прежде чем публиковать его. Он использовал версию кода производителя Kafka 0.8.0.
Мы используем Kafka 0.9.0.1 version
. Поскольку наш кластер защищен с помощью Kerberos authentication
. Как программно установить детали аутентификации Kerberos?
Когда я пытаюсь отправить сообщение, оно не доходит до темы Кафки. Я подтвердил это, получив результат асинхронной операции send
от производителя Future<RecordMetadata>
, и он выдавал исключение тайм-аута.
Пример кода:
Properties props = new Properties();
props.put("bootstrap.servers", "1.2.3.4:6667");
props.put("acks", "all");
props.put("retries", 2);
// props.put("security.inter.broker.protocol","PLAINTEXTSASL");
props.put("security.protocol", "PLAINTEXTSASL");//WHEN I TRY THIS ITS THROWING ERROR
props.put("sasl.kerberos.service.name","zookeeper");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try {
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("kafkaTest", "kafka test message"));
// Thread.sleep(4000);
// producer.close();
} catch (Exception e) {
e.printStackTrace();
}
Нужно ли что-то менять в файле jaas
? Я не вижу никакой документации для Java API с аутентификацией Kerberos.
Исключение тайм-аута:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
at com.scb.upv.SimpleKafkaProducer.main(SimpleKafkaProducer.java:26)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.