Поддержка коннектора kafka-connect-hdfs для сохранения массива байтов и разделения полей с использованием схемы FlatBuffer

Я искал поддержку коннектора kafka-connect-hdfs (Confluent) для сохранения массива байтов и разделения полей с использованием схемы FlatBuffer.

Я получаю данные в байтовом массиве от кафки. Этот байтовый массив создается из FlatBuffer. Необходимо сохранить его в HDFS по пути, например, Field1 / Field2 / Field3. Все эти поля необходимо извлечь из массива байтов с помощью схемы FlatBuffer. Кроме того, данные для сохранения в HDFS должны быть только байтами. Преобразование данных не требуется.

Я проверил оба:

  1. FieldPartitioner: https://github.com/confluentinc/kafka-connect-storage-common/blob/master/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldPartitioner.java
  2. Поддерживаемые форматы: Json, Avro, Parquet. В https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonRecordWriterProvider.java, хотя я обнаружил, что массив байтов сохранен в HDFS, если данные относятся к типу Kafka Struct.

Я не мог найти способ использовать их для своих целей.

Кто-нибудь знает о такой встроенной поддержке. Если нет, то, пожалуйста, направьте меня к ресурсу (если есть), чтобы создать индивидуальную поддержку для обоих.


person KRS    schedule 17.02.2019    source источник


Ответы (1)


FlatBuffers не является (в настоящее время) поддерживаемым форматом сериализации, а ByteArrayFormat доступен только для S3 Connect, но не для HDFS, и просто выгружает формат ByteArraySerializer из Kafka (который будет объектом Struct после преобразователя, да.

Что касается разделения, поскольку данные представляют собой только байты, он не проверяет значения записей для поддержки разделителей, поэтому вам также потребуется добавить настраиваемый один из них, который потребует десериализации сообщения для проверки полей.

Я не уверен, почему вы связались с кодом подключения S3, но если вы хотите добавить свой собственный формат, посмотрите PR, добавивший StringFormat для подключения HDFS


Чтобы собрать проект, просмотрите FAQ

person OneCricketeer    schedule 21.02.2019