Я хочу использовать kafka-connect-hdfs для записи json-записей без схемы из kafka в файл hdfs. Если я использую JsonConvertor в качестве преобразователя ключа / значения, он не работает. Но если я использую StringConvertor, он записывает json как экранированную строку.
Например:
фактический json -
{"name":"test"}
данные, записанные в файл hdfs -
"{\"name\":\"test\"}"
ожидаемый вывод в файл hdfs -
{"name":"test"}
Есть ли способ или альтернатива, чтобы я мог добиться этого, или я должен использовать его только со схемой?
Ниже приведено исключение, которое я получаю, когда пытаюсь использовать JSONConvertor:
[2017-09-06 14:40:19,344] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:406)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Конфигурация quickstart-hdfs.properties:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_avro
hdfs.url=hdfs://localhost:9000
flush.size=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Конфигурация connect-avro-standalone.properties:
bootstrap.servers=localhost:9092
schemas.enable=false
key.converter.schemas.enable=false
value.converter.schemas.enable=false