Я пытаюсь настроить коннектор Debezium для нескольких таблиц в базе данных MySQL (я использую debezium 1.4 в MySQL 8.0). У моей компании есть шаблон номенклатуры, которому нужно следовать при создании тем в kafka, и этот шаблон не позволяет использовать символы подчеркивания (_), поэтому мне пришлось заменить их дефисами (-)
Итак, названия моих тем:
Тема 1
fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status".
- All changes in that table, must go to that topic.
Тема 2
fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.
Тема 3
fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.
Я пытаюсь использовать преобразования ByLogicalTableRouter, но не могу найти решение с регулярным выражением, которое решает мой случай.
{ "name": "debezium.connector",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1000",
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2"
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2"
}
}
- В первых преобразованиях я пытаюсь удалить повторяющееся имя схемы в маршрутизации темы.
- Во втором преобразовании, чтобы заменить все остатки подчеркивания _ для бедер -
Но при этом я получаю сообщение об ошибке ниже, которое указывает на то, что он пытается отправить все в ту же тему.
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier
Как я могу сделать преобразование, которое перенаправит события каждой таблицы в соответствующую тему?