Производитель Kafka создает тему, но не может отправлять сообщения

Я новичок в Scala и Kafka, и у меня возникли проблемы.

Я пытаюсь подключить производителя scala kafka к серверу kafka, установленному на сервере cloudera express. Я уже однажды делал это на виртуальных машинах с помощью этих инструкций и проблем не было.

Когда я запускаю производителя, нужная тема создается, но ни одно из сообщений не отправляется, по крайней мере, я так думаю.

Вот часть кода:

Производитель Kafka

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

class KafkaProducerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("acks", "all")
    props.put("retries", "2")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("block.on.buffer.full", "true")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("auto.create.topics.enable", "true")

    val producer = new KafkaProducer[String, String](props)

   def startCounter() {
       println("Start Producer Counter")
       for (i <- 1 to 100) {
           producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
           println("Producer - Send: " + i)
       }

       println("Closing producer")
       producer.close()
   }
}

Когда я выполняю метод run, я вижу «Producer - Send: #» в качестве вывода и не получаю ошибок. Итак, я предполагаю, что этот фрагмент кода отправил сообщения Кафке.

Я запустил следующее на сервере kafka до того, как запустил продюсер:

 kafka-console-consumer --zookeeper zk:2181 --topic test-counter

Но здесь я вижу, что ничего не происходит.

Когда я проверяю, есть ли в списке тема, которую должен создать продюсер.

kafka-topics -zookeeper zk:2181 --list

У меня тоже похожая проблема с потребителем:

import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

class KafkaConsumerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("group.id", "testGroup")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("session.timeout.ms", "3000")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)

    val consumer = new KafkaConsumer[String, String](props)

    def start() {
        println("Start Consumer")
        consumer.subscribe(Arrays.asList("test-counter"))

        while (true) {
            val records = consumer.poll(100)
            val iterator = records.iterator()

            while (iterator.hasNext) {
                val record = iterator.next()
                printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
            }
        }
    }
}

Когда я создаю сообщения на сервере через kafka-console-продюсер, я вижу, что они появляются в kafka-console-consumer на сервере, но не в потребителе, которого я написал.

kafka-console-producer --broker-list ks:9092 --topic test-counter

KafkaServer.ZOOKEEPER_ADDRESS совпадает с аргументом zk: 2181 с kafka-console-consumer и KafkaServer.KAFKA_ADDRESS совпадает с аргументом ks: 9092 с производителем kafka-console.


person Lorenz Verschingel    schedule 21.04.2016    source источник


Ответы (1)


Я попробовал код и обнаружил, что:

  • в потребительских свойствах необходимо указать ключ и значение десериализаторы:

     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    
  • # P3 #
    # P4 #
    # P5 #

После внесения исправлений код работает.

person Aliaxander    schedule 21.04.2016
comment
Я попробую это немедленно - person Lorenz Verschingel; 23.04.2016
comment
С этими изменениями проблема все еще существует. Но теперь я вижу причину java.net.ConnectException: Connection timed out: no further information.. По какой-то причине и производитель, и потребитель не могут подключиться к ks: 9092. - person Lorenz Verschingel; 23.04.2016
comment
Я пробовал этот код с другим сервером kafka, и там он отлично работает. Кажется, какая-то проблема с нашим кафкасервером на cloudera и конфигурацией. - person Lorenz Verschingel; 25.04.2016