Потоковая передача сообщений Kafka в базу данных MySQL

Я хочу писать сообщения Kafka в базу данных MySQL. Пример приведен в этом ссылка. В этом примере apache flume используется для приема сообщений и их записи в MySQL. Я использую тот же код, и когда я запускаю flume-ng agent и event, всегда становится null

И мой файл flume.conf.properties:

agent.sources=kafkaSrc
agent.channels=channel1
agent.sinks=jdbcSink

agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel1.brokerList=localhost:9092
agent.channels.channel1.topic=kafkachannel
agent.channels.channel1.zookeeperConnect=localhost:2181
agent.channels.channel1.capacity=10000
agent.channels.channel1.transactionCapacity=1000
agent.channels.channel1.parseAsFlumeEvent=false


agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSrc.channels = channel1
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181
agent.sources.kafkaSrc.topic = kafka-mysql

agent.sinks.jdbcSink.type = com.stratio.ingestion.sink.jdbc.JDBCSink
agent.sinks.jdbcSink.connectionString = jdbc:mysql://127.0.0.1:3306/test?useSSL=false
agent.sinks.jdbcSink.username=root
agent.sinks.jdbcSink.password=pass
agent.sinks.jdbcSink.batchSize = 10
agent.sinks.jdbcSink.channel =channel1
agent.sinks.jdbcSink.sqlDialect=MYSQL
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver
agent.sinks.jdbcSink.sql=INSERT INTO kafkamsg(msg) VALUES(${body:varchar})

Где я не прав?

Спасибо.


person fena coder    schedule 28.09.2017    source источник


Ответы (1)


В моем эталонном примере Flume слушает kafka для kafka-mysql темы. Но этот код работает для темы kafkachannel. Итак, нам нужно создавать сообщения в kafkachannel теме, я не знаю, почему.

person fena coder    schedule 06.10.2017