Настройте коннектор debezium для нескольких таблиц в базе данных

Я пытаюсь настроить коннектор Debezium для нескольких таблиц в базе данных MySQL (я использую debezium 1.4 в MySQL 8.0). У моей компании есть шаблон номенклатуры, которому нужно следовать при создании тем в kafka, и этот шаблон не позволяет использовать символы подчеркивания (_), поэтому мне пришлось заменить их дефисами (-)

Итак, названия моих тем:

Тема 1

fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status". 
- All changes in that table, must go to that topic.

Тема 2

fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.

Тема 3

fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.

Я пытаюсь использовать преобразования ByLogicalTableRouter, но не могу найти решение с регулярным выражением, которое решает мой случай.

{ "name": "debezium.connector",
 "config":
    { 
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium", 
"database.password": "password", 
"database.server.id": "1000", 
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2" 
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2" 
    }
}
  • В первых преобразованиях я пытаюсь удалить повторяющееся имя схемы в маршрутизации темы.
  • Во втором преобразовании, чтобы заменить все остатки подчеркивания _ для бедер -

Но при этом я получаю сообщение об ошибке ниже, которое указывает на то, что он пытается отправить все в ту же тему.

Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier

Как я могу сделать преобразование, которое перенаправит события каждой таблицы в соответствующую тему?


person Malkath    schedule 17.02.2021    source источник


Ответы (1)


  1. Удаление имени схемы

В первых преобразованиях я пытаюсь удалить повторяющееся имя схемы в маршрутизации темы.

После преобразования с вашим регулярным выражением у вас будет две точки, поэтому вам нужно исправить это:

"transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)",
"transforms.RerouteName.topic.replacement": "$1.$2" 
  1. Замените нижнее подчеркивание на бедра

Вы можете попробовать использовать ChangeCase SMT из Общие преобразования Kafka Connect .

person Iskuskov Alexander    schedule 17.02.2021
comment
Не сработало :( Сначала я попробовал такие преобразования: RerouteName, changeCase, transforms.RerouteName.type: io.debezium.transforms.ByLogicalTableRouter, transforms.RerouteName.topic.regex: ([^.] +) \\. Transaction_search \\. . ([^.] +), transforms.RerouteName.topic.replacement: $ 1. $ 2, transforms.changeCase.type: com.github.jcustenborder.kafka.connect.transform.common.ChangeCase $ Value, transforms.changeCase.from : LOWER_UNDERSCORE, transforms.changeCase.to: LOWER_HYPHEN и получение ошибки: Вызвано: org.apache.avro.SchemaParseException: недопустимый символ в: ts-ms - person Malkath; 18.02.2021
comment
transforms: RerouteName, RerouteUnder, transforms.RerouteName.type: io.debezium.transforms.ByLogicalTableRouter, transforms.RerouteName.topic.regex: (. *). card_acquisition. (. *), transforms.RerouteName.topic. $ 2, transforms.RerouteUnder.type: io.debezium.transforms.ByLogicalTableRouter, transforms.RerouteUnder.topic.regex: (. *) _ (. *), Transforms.RerouteUnder.topic.replacement: $ 1- $ 2, и возникла ошибка автор: org.apache.kafka.connect.errors.SchemaBuilderException: Невозможно создать поле из-за дублирования имени поля __dbz__physicalTableIdentifier - person Malkath; 18.02.2021
comment
Первая ошибка исходит от AvroConverter, вместо этого вы можете использовать JSONConverter. - person Iskuskov Alexander; 18.02.2021
comment
К сожалению, я не могу использовать JSON в этом проекте, мне нужно использовать Avro. - person Malkath; 18.02.2021
comment
Подскажите пожалуйста, все ли работает как хотите без трансформаций (кроме названия темы конечно)? - person Iskuskov Alexander; 18.02.2021
comment
Да, моя проблема связана только с маршрутизацией тем из-за шаблонов имен тем, которым я должен следовать. Когда я провожу локальный тест (где я могу использовать любое имя в теме и не нужно использовать какие-либо преобразования), он работает отлично. - person Malkath; 18.02.2021
comment
Эта работа для меня. Спасибо Искускову !! transforms.topicCase.type: com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase, transforms.topicCase.from: LOWER_UNDERSCORE, transforms.topicCase.to: LOWER_HYPHEN, - person Malkath; 25.02.2021
comment
Отличные новости! Пожалуйста! Если мой ответ был полезен, проголосуйте, пожалуйста :) - person Iskuskov Alexander; 25.02.2021