Я пытаюсь получить данные из базы данных Oracle с помощью Kafka Connect. Объект по умолчанию, предоставляемый соединителями Kafka, имеет тип «GenericRecord». Это делает его слишком специфичным и приводит к ситуации, когда данные получаются путем выполнения записи.getAsString("someIDENTIFIER"). Есть ли возможность получить конкретный тип объекта вместо типа GenericRecord.
Интеграция Kafka Connect
Ответы (1)
Исходные соединители Kafka Connect работают с объектами SourceRecord
, а рабочий процесс Kafka Connect настроен на использование преобразователя, который сериализует SourceRecord
в двоичную форму, которая затем записывается в тему Kafka. Kafka Connect поставляется с преобразователем JSON, а Confluent предоставляет преобразователь Avro. Таким образом, двоичная форма сообщения, записываемого в Kafka, зависит от того, какой конвертер вы используете.
(Аналогичным образом коннекторы-приемники работают с объектами SinkRecord
, а рабочий процесс Kafka Connect использует свой преобразователь для десериализации двоичной формы сообщения, прочитанного из Kafka, в объект SinkRecord
, с которым работает коннектор.)
Похоже, вы пишете потребителя Kafka и видите там GenericRecord
объектов. Если это так, то вы, вероятно, настроили рабочий процесс Kafka Connect для использования конвертера Confluent Avro, который для исходных соединителей, таких как соединитель JDBC, преобразует SourceRecord
в двоичный формат Avro, который Kafka Connect затем записывает в тему Kafka. Тогда ваш клиент, вероятно, использует потребителя Kafka, настроенного с десериализатором Avro, и если вы не предоставите десериализатору схему Avro для работы с ним, он десериализует закодированное сообщение Avro в Avro GenericRecord
.
Однако вы можете настроить свое приложение так, чтобы оно знало об определенной версии схемы Avro, и заставить систему сборки генерировать код для этой версии схемы Avro, чтобы создать специальный код, который будет десериализовать сообщение, закодированное в Avro, в сообщение в формате in. - форма памяти, описываемая схемой. В Java это означает, что вы должны сгенерировать класс из схемы, а затем использовать этот сгенерированный класс в своем коде для копирования GenericRecord
в экземпляр вашего класса. См. этот полный пример потребителя, и, в частности, эту строку для преобразования из GenericRecord
. В этом примере LogLine
— это класс, сгенерированный из схемы Avro:
GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message();
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent);
Одним из значительных преимуществ Avro является то, что он напрямую поддерживает эволюцию схемы, и реестр схем Confluent использует это преимущество. Таким образом, в то время как исходный коннектор может изменять сгенерированную схему Avro для таблицы в ответ на изменение структуры таблицы в базе данных, пока схема базы данных изменяется таким образом, что схемы Avro будут совместимы с предыдущими и/или прямыми версиями, библиотеки Avro используемое вашим клиентским приложением, будет автоматически преобразовываться из схемы Avro сообщений в схему Avro, используемую вашим приложением.
Конечно, в какой-то момент вы измените свое приложение, чтобы использовать новую схему Avro, но это не обязательно должно происходить одновременно. На самом деле, если вы настроите реестр схемы таким образом, чтобы версии схемы были совместимы как с прямой, так и с обратной совместимостью, вы можете изменить свое клиентское приложение до или после изменения базы данных и Соединитель источника JDBC начинает использовать новую версию схемы Avro.