Я изучаю платформу Confluent (Kafka, Ksql и т. Д.). Я передаю данные в тему Kafka с помощью Debezium с Kafka Connect. Одно из полей в моей таблице базы данных «журнал» называется «регистр», которое является меткой времени, когда была добавлена запись.
Для справки структура табличного журнала (в исходных базах данных MySQL) следующая:
CREATE TABLE `log` (
`code` varchar(9) NOT NULL,
`register` datetime NOT NULL,
`entry` mediumtext NOT NULL,
PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
Я передаю данные из таблицы «журнала» в двух базах данных в одну тему Kafka, используя следующую конфигурацию, которая работает по назначению.
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.topicRoute.replacement": "merged.$3",
Я пытаюсь создать поток KSQL, который создает новый ключ, который представляет собой конкатенацию исходной базы данных (из метаданных, сгенерированных Debezium) и поля кода из таблицы журнала вместе с остальными полями из таблицы. Цель этого состоит в том, чтобы производный ключ был полностью уникальным при отправке в приемник (в настоящее время подключается к другой базе данных MySQL, которая содержит одну таблицу журнала, содержимое которой должно быть объединенной копией двух таблиц журнала исходной базы данных)
Запрос, который я пытаюсь выполнить:
SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Однако возникает следующая ошибка:
line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE',
'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException
Я не вижу нигде, что предполагало бы, что «регистр» - это какой-то зарезервированный термин.
Кто-нибудь может помочь? Альтернатива может предложить способ изменения имени поля на пути к преобразованию, имея в виду, что я не могу сгладить сообщение, сгенерированное Debezium, поскольку мне нужно иметь возможность добраться до имени исходной базы данных