Я новичок в Scala и Kafka, и у меня возникли проблемы.
Я пытаюсь подключить производителя scala kafka к серверу kafka, установленному на сервере cloudera express. Я уже однажды делал это на виртуальных машинах с помощью этих инструкций и проблем не было.
Когда я запускаю производителя, нужная тема создается, но ни одно из сообщений не отправляется, по крайней мере, я так думаю.
Вот часть кода:
Производитель Kafka
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaProducerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("acks", "all")
props.put("retries", "2")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("block.on.buffer.full", "true")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("auto.create.topics.enable", "true")
val producer = new KafkaProducer[String, String](props)
def startCounter() {
println("Start Producer Counter")
for (i <- 1 to 100) {
producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
println("Producer - Send: " + i)
}
println("Closing producer")
producer.close()
}
}
Когда я выполняю метод run, я вижу «Producer - Send: #» в качестве вывода и не получаю ошибок. Итак, я предполагаю, что этот фрагмент кода отправил сообщения Кафке.
Я запустил следующее на сервере kafka до того, как запустил продюсер:
kafka-console-consumer --zookeeper zk:2181 --topic test-counter
Но здесь я вижу, что ничего не происходит.
Когда я проверяю, есть ли в списке тема, которую должен создать продюсер.
kafka-topics -zookeeper zk:2181 --list
У меня тоже похожая проблема с потребителем:
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer
class KafkaConsumerManager {
val props = new Properties()
props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
props.put("group.id", "testGroup")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("linger.ms", "1")
props.put("session.timeout.ms", "3000")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)
val consumer = new KafkaConsumer[String, String](props)
def start() {
println("Start Consumer")
consumer.subscribe(Arrays.asList("test-counter"))
while (true) {
val records = consumer.poll(100)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
}
}
}
}
Когда я создаю сообщения на сервере через kafka-console-продюсер, я вижу, что они появляются в kafka-console-consumer на сервере, но не в потребителе, которого я написал.
kafka-console-producer --broker-list ks:9092 --topic test-counter
KafkaServer.ZOOKEEPER_ADDRESS совпадает с аргументом zk: 2181 с kafka-console-consumer и KafkaServer.KAFKA_ADDRESS совпадает с аргументом ks: 9092 с производителем kafka-console.