У меня есть тема kafka, которая получает данные из базы данных mysql с использованием исходного соединителя Debezium mysql, ниже приведен формат одного из сообщений:
{
"Message": {
"schema": {
"type": "struct",
"fields": [
...
],
"optional": true,
"name": "mysql-server-1.inventory.somename"
},
"payload": {
"op": "u",
"ts_ms": 1465491411815,
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Doof",
"email": "[email protected]"
},
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Marry",
"email": "[email protected]"
},
"source": {
"db": "inventory",
"table": "customers",
...
"query": "Update customers set last_name = 'Marry' where id = 1004"
}
}
}
}
Я хочу вставить столбцы ts_ms, before, after
и id
(из объекта / строки) в другую базу данных, используя соединитель приемника jdbc со схемой таблицы как (id,before(text),after(text),timestamp)
, будучи новичком в kafka, не могу понять:
как я могу извлечь только эти поля из сообщения, чтобы нажимать и игнорировать другие?
как я могу преобразовать поля до и после в строковый / сериализованный формат?
как я могу извлечь
id
из объекта? (в случае операции вставки, before будет null, для удаления, after будет null)
В приведенном выше сообщении таблица назначения приемника должна иметь в конце данные, подобные приведенным ниже:
id: 1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"[email protected]"}'
after: '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"[email protected]"}'
timestamp: 1465491411815