Интеграция Kafka Connect

Я пытаюсь получить данные из базы данных Oracle с помощью Kafka Connect. Объект по умолчанию, предоставляемый соединителями Kafka, имеет тип «GenericRecord». Это делает его слишком специфичным и приводит к ситуации, когда данные получаются путем выполнения записи.getAsString("someIDENTIFIER"). Есть ли возможность получить конкретный тип объекта вместо типа GenericRecord.


person Achilleus    schedule 25.07.2017    source источник
comment
Какой подключаемый модуль Connect вы используете? JDBC? DBПосетить? Золотые ворота?   -  person Robin Moffatt    schedule 26.07.2017
comment
Я использую слитный JDBC. коннектор.класс: io.confluent.connect.jdbc.JdbcSourceConnector   -  person Achilleus    schedule 26.07.2017


Ответы (1)


Исходные соединители Kafka Connect работают с объектами SourceRecord, а рабочий процесс Kafka Connect настроен на использование преобразователя, который сериализует SourceRecord в двоичную форму, которая затем записывается в тему Kafka. Kafka Connect поставляется с преобразователем JSON, а Confluent предоставляет преобразователь Avro. Таким образом, двоичная форма сообщения, записываемого в Kafka, зависит от того, какой конвертер вы используете.

(Аналогичным образом коннекторы-приемники работают с объектами SinkRecord, а рабочий процесс Kafka Connect использует свой преобразователь для десериализации двоичной формы сообщения, прочитанного из Kafka, в объект SinkRecord, с которым работает коннектор.)

Похоже, вы пишете потребителя Kafka и видите там GenericRecord объектов. Если это так, то вы, вероятно, настроили рабочий процесс Kafka Connect для использования конвертера Confluent Avro, который для исходных соединителей, таких как соединитель JDBC, преобразует SourceRecord в двоичный формат Avro, который Kafka Connect затем записывает в тему Kafka. Тогда ваш клиент, вероятно, использует потребителя Kafka, настроенного с десериализатором Avro, и если вы не предоставите десериализатору схему Avro для работы с ним, он десериализует закодированное сообщение Avro в Avro GenericRecord.

Однако вы можете настроить свое приложение так, чтобы оно знало об определенной версии схемы Avro, и заставить систему сборки генерировать код для этой версии схемы Avro, чтобы создать специальный код, который будет десериализовать сообщение, закодированное в Avro, в сообщение в формате in. - форма памяти, описываемая схемой. В Java это означает, что вы должны сгенерировать класс из схемы, а затем использовать этот сгенерированный класс в своем коде для копирования GenericRecord в экземпляр вашего класса. См. этот полный пример потребителя, и, в частности, эту строку для преобразования из GenericRecord. В этом примере LogLine — это класс, сгенерированный из схемы Avro:

GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message();
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent);

Одним из значительных преимуществ Avro является то, что он напрямую поддерживает эволюцию схемы, и реестр схем Confluent использует это преимущество. Таким образом, в то время как исходный коннектор может изменять сгенерированную схему Avro для таблицы в ответ на изменение структуры таблицы в базе данных, пока схема базы данных изменяется таким образом, что схемы Avro будут совместимы с предыдущими и/или прямыми версиями, библиотеки Avro используемое вашим клиентским приложением, будет автоматически преобразовываться из схемы Avro сообщений в схему Avro, используемую вашим приложением.

Конечно, в какой-то момент вы измените свое приложение, чтобы использовать новую схему Avro, но это не обязательно должно происходить одновременно. На самом деле, если вы настроите реестр схемы таким образом, чтобы версии схемы были совместимы как с прямой, так и с обратной совместимостью, вы можете изменить свое клиентское приложение до или после изменения базы данных и Соединитель источника JDBC начинает использовать новую версию схемы Avro.

person Randall Hauch    schedule 27.07.2017
comment
Большое спасибо за ваш ответ. Но я использую Scala для написания Consumers. Кроме того, мы используем Avro для создания всей нашей схемы и создания соответствующих классов. Можно ли этого добиться даже при использовании scala или есть другой способ имитировать это. - person Achilleus; 27.07.2017
comment
См. этот вопрос для начала о том, как вы можете сделать это для Scala, используя библиотека avro4s для создания вашего класса. Могут быть и другие библиотеки. - person Randall Hauch; 28.07.2017