Могу ли я написать настраиваемое преобразование kafka connect для преобразования JSON в AVRO?

Я хочу использовать kafka-connect-hdfs для записи json-записей без схемы из kafka в файл hdfs. Если я использую JsonConvertor в качестве преобразователя ключа / значения, он не работает. Но если я использую StringConvertor, он записывает json как экранированную строку.

Например:

фактический json -

{"name":"test"}

данные, записанные в файл hdfs -

"{\"name\":\"test\"}"

ожидаемый вывод в файл hdfs -

{"name":"test"}

Есть ли способ или альтернатива, чтобы я мог добиться этого, или я должен использовать его только со схемой?

Ниже приведено исключение, которое я получаю, когда пытаюсь использовать JSONConvertor:

[2017-09-06 14:40:19,344] ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:406)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Конфигурация quickstart-hdfs.properties:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_avro
hdfs.url=hdfs://localhost:9000
flush.size=1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

Конфигурация connect-avro-standalone.properties:

bootstrap.servers=localhost:9092
schemas.enable=false
key.converter.schemas.enable=false
value.converter.schemas.enable=false

person Nandish Kotadia    schedule 05.09.2017    source источник
comment
Если я использую JsonConvertor в качестве преобразователя ключа / значения, он не работает. - ›Можете ли вы описать проблему, с которой столкнулись? ошибку вы получаете?   -  person Robin Moffatt    schedule 05.09.2017
comment
@RobinMoffatt Я добавил исключение и свойства, которые я настроил для этого коннектора.   -  person Nandish Kotadia    schedule 06.09.2017


Ответы (1)


Когда вы указываете конвертер в свойствах конфигурации вашего коннектора, вам необходимо включить все свойства, относящиеся к этому конвертеру, независимо от того, включены ли такие свойства также в конфигурацию рабочего.

В приведенном выше примере вам нужно указать оба:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

в quickstart-hdfs.properties.

К вашему сведению, экспорт JSON скоро появится в коннекторе HDFS. Отслеживайте связанный запрос на перенос здесь: https://github.com/confluentinc/kafka-connect-hdfs/pull/196

Обновление: JsonFormat был объединен с основной веткой.

person Konstantine Karantasis    schedule 06.09.2017
comment
Спасибо за информацию. Не могли бы вы обновить эту ветку, когда запрос будет объединен с главным и доступен для использования? - person Nandish Kotadia; 07.09.2017
comment
JsonFormat был объединен. - person Konstantine Karantasis; 07.09.2017
comment
Большое спасибо. Итак, чтобы использовать этот код. Придется ли мне скачать мастер и собрать его вручную или есть другой способ? - person Nandish Kotadia; 08.09.2017
comment
Это правильно. Он будет включен в предстоящий выпуск платформы Confluent. До тех пор ранним последователям нужно будет строить из исходников. - person Konstantine Karantasis; 08.09.2017
comment
Хорошо, спасибо, но есть ли руководство, в котором я могу найти все проекты, которые мне нужно создать, и шаги по созданию исходного кода вручную? Я попытался собрать common и hdfs, но проект hdfs показал ошибку других зависимостей. Не удалось найти зависимости от конфлюентного репозитория maven. Любая идея? - person Nandish Kotadia; 08.09.2017
comment
Обычно эта информация находится в разделе часто задаваемых вопросов на странице github коннектора. Например. для соединителя HDFS: github.com/confluentinc/kafka-connect-hdfs/wiki / FAQ (необходимо обновить, чтобы включить kafka-connect-storage-common) и для kafka-connect-storage-common: github.com/confluentinc/kafka-connect-storage-common/wiki/FAQ - person Konstantine Karantasis; 15.09.2017
comment
Проголосовали против, потому что это приведет к тому, что каждая запись в выходных данных будет простой строкой, которая нигде не отмечена. - person Marcin; 15.06.2018