извлекать и преобразовывать специфичные для сообщения kafka поля для коннектора приемника jdbc

У меня есть тема kafka, которая получает данные из базы данных mysql с использованием исходного соединителя Debezium mysql, ниже приведен формат одного из сообщений:

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "[email protected]"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "[email protected]"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

Я хочу вставить столбцы ts_ms, before, after и id (из объекта / строки) в другую базу данных, используя соединитель приемника jdbc со схемой таблицы как (id,before(text),after(text),timestamp), будучи новичком в kafka, не могу понять:

  • как я могу извлечь только эти поля из сообщения, чтобы нажимать и игнорировать другие?

  • как я могу преобразовать поля до и после в строковый / сериализованный формат?

  • как я могу извлечь id из объекта? (в случае операции вставки, before будет null, для удаления, after будет null)

В приведенном выше сообщении таблица назначения приемника должна иметь в конце данные, подобные приведенным ниже:

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"[email protected]"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"[email protected]"}'
timestamp: 1465491411815

person Kamboh    schedule 18.04.2020    source источник


Ответы (2)


Вы можете использовать цепочку Kafka Connect Transformations, например решение.

person Iskuskov Alexander    schedule 18.04.2020
comment
спасибо @Iskuskov Александр, можете ли вы придумать что-то конкретное, что может работать для обоих требований, преобразовывая И сериализацию Json? - person Kamboh; 18.04.2020
comment
Вам нужны before и after в виде строк? - person Iskuskov Alexander; 21.04.2020
comment
@@ Искусков Александр Да - person Kamboh; 21.04.2020
comment
Я думаю, вы можете сделать следующие шаги: 1. использовать id из ключа сообщения, используя свойство record_key коннектора JDBC Sink; 2. Используйте Cast SMT для полей after и before; 3. Используйте Replace SMT, чтобы переименовать ts_ms; 4. Используйте "Заменить SMT" для фильтрации полей. - person Iskuskov Alexander; 25.04.2020

Вы можете создать DTO (объект Java для полезной нагрузки json, который вы получаете из своей темы kafka), используя эти онлайн-конвертеры, которые помогут вам преобразовать ваш json в объекты Java. [http://pojo.sodhanalibrary.com/visible[1pting

Как только вы получите сообщение из темы kafka, вы можете использовать objectmapper для преобразования этого json и сопоставления его с вашими соответствующими объектами DTO. Как только у вас будет готов объект. Вы можете использовать этот объект для извлечения нужных полей, просто вызывая getId (), getBefore () и т. Д.,

Вот некоторый справочный код, который поможет вам понять:

    @KafkaListener(topics = "test")
        public void listen(String payload)  {

            logger.info("Message Received from Kafka topic: {}", payload);

            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);

                logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));

                logger.info("Get Before:{}", dtoObject.getId());



        }
person TechGeek    schedule 19.04.2020
comment
это и SMT или что, как я уже упоминал, я использую соединитель приемника jdbc для целевой базы данных, как я могу использовать этот фрагмент кода, который вы упомянули, и где? Благодарность - person Kamboh; 19.04.2020