Зарегистрировать зарезервированное ключевое слово в Ksql, и если да, то как я могу выбрать поле с таким именем

Я изучаю платформу Confluent (Kafka, Ksql и т. Д.). Я передаю данные в тему Kafka с помощью Debezium с Kafka Connect. Одно из полей в моей таблице базы данных «журнал» называется «регистр», которое является меткой времени, когда была добавлена ​​запись.

Для справки структура табличного журнала (в исходных базах данных MySQL) следующая:

CREATE TABLE `log` (
  `code` varchar(9) NOT NULL,
  `register` datetime NOT NULL,
  `entry` mediumtext NOT NULL,
  PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

Я передаю данные из таблицы «журнала» в двух базах данных в одну тему Kafka, используя следующую конфигурацию, которая работает по назначению.

"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.topicRoute.replacement": "merged.$3",

Я пытаюсь создать поток KSQL, который создает новый ключ, который представляет собой конкатенацию исходной базы данных (из метаданных, сгенерированных Debezium) и поля кода из таблицы журнала вместе с остальными полями из таблицы. Цель этого состоит в том, чтобы производный ключ был полностью уникальным при отправке в приемник (в настоящее время подключается к другой базе данных MySQL, которая содержит одну таблицу журнала, содержимое которой должно быть объединенной копией двух таблиц журнала исходной базы данных)

Запрос, который я пытаюсь выполнить:

SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;

Однако возникает следующая ошибка:

line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE',
        'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
        'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
        'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
        'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
        QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

Я не вижу нигде, что предполагало бы, что «регистр» - это какой-то зарезервированный термин.

Кто-нибудь может помочь? Альтернатива может предложить способ изменения имени поля на пути к преобразованию, имея в виду, что я не могу сгладить сообщение, сгенерированное Debezium, поскольку мне нужно иметь возможность добраться до имени исходной базы данных


person pr.lwd    schedule 14.05.2019    source источник
comment
Хорошо, поэтому я поговорил с кем-то, кто знает, и регистрация - это ключевое слово, поэтому остается вопрос, можно ли использовать SMT для изменения имени поля на его пути в Kafka из сообщения Debezium   -  person pr.lwd    schedule 15.05.2019


Ответы (1)


  1. Да REGISTER - зарезервированное слово, вам следует избегать его в DDL. Возможно, вы сможете получить к нему доступ, процитировав его, стоит попробовать.

  2. Существует преобразование одиночного сообщения для удаления полей, но оно не работает с вложенными данными. Вы можете попробовать UnwrapFromEnvelope SMT в сочетании с одним для переименования поля. Я не пробовал эту конфигурацию, но что-то вроде

    "transforms": "unwrap,renameField",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.renameField.renames": "register:notareservedword",
    
person Robin Moffatt    schedule 15.05.2019
comment
Спасибо, как всегда, Робин. Я до сих пор избегал преобразования UnwrapFromEnvelope, поскольку оно усложняет фиксацию удалений, что нам и нужно сделать (по крайней мере, моя первоначальная попытка не сработала, я думаю, я перефокусируюсь на этой теме). - person pr.lwd; 16.05.2019