Как настроить коннектор Debezium Mysql для создания примитивного ключа вместо объекта struct или json?

Я использую Debezium для обнаружения изменений в исходных таблицах MySql. Как я могу создавать сообщения Kafka, чтобы ключом было числовое (Long) значение вместо объекта Json?

Что я получаю:

key: {"foo_id": 123} 
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}

Что я хочу:

key: 123
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}

Моя таблица FOO выглядит так:

foo_id: INT
bar: VARCHAR 
baz: VARCHAR

Обратите внимание, что я не использую avro, и я экспериментировал с несколькими комбинациями, указанными ниже (с и без ключевых трансформаторов), но не смог получить ключ Long.

"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",        
"key.converter" : "org.apache.kafka.connect.converters.LongConverter",
"key.converter.schemas.enable": "false", 
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"

Я не уверен, что ValueToKey или ExtractField работают для (MySQL) Source, но у меня NPE ниже.

Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) 

person nanaboo    schedule 09.03.2019    source источник


Ответы (1)


Нашел решение, основанное на этом https://issues.jboss.org/browse/DBZ-689

{
...
    "config": {
    "transforms": "unwrap,insertKey,extractKey",
    "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones":"false",
    "transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.insertKey.fields":"foo_id",
    "transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field":"foo_id",        
    "key.converter" : "org.apache.kafka.connect.converters.IntegerConverter",
    "key.converter.schemas.enable": "true", 
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "include.schema.changes": "false"  <-- this was missing
    }
}

Теперь я вижу foo_id как Integer (не беда, это не Long) :)

person nanaboo    schedule 10.03.2019