Соединитель Debezium Kafka CDC делает ключ как avro, даже если конвертер StringConverver

Это мои конфигурации соединителя:

curl -s -k -X POST http://***************:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mysql-cdc-CUSTOMER_DETAILS-007",
  "config": {
    "tasks.max":"2",
    "poll.interval.ms":"500",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "dbnode",
    "database.port": "3306",
    "database.user": "**********",
    "database.password": "###########",
    "database.server.name": "dbnode",
    "database.whitelist": "device_details",
    "database.history.kafka.bootstrap.servers": "**********:9092",
    "database.history.kafka.topic": "schema-changes.device_details",
    "include.schema.changes":"true",
    "table.whitelist":"device_details.tb_customermst",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://************:8081",
    "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enable":"false",
    "internal.value.converter.schemas.enable":"false"
  }
}' | jq '.'

При использовании данных из ksql это выглядит следующим образом:

ksql> print 'Device_Details.device_details.tb_customermst' from beginning;
Format:AVRO
5/2/20 2:08:34 PM IST, Struct{customerid=10001}, {"before": null, "after": {"customerid": 10001, "firstname": "Klara", "lastname": "Djokic", "emailid": "[email protected]", "mobilenumber": "+1 (480) 361-5311", "customertype": "Commercial", "emailverified": 1, "mobileverified": 1, "city": "Gilbert", "postcode": "85296", "address": "3426 E Elgin St", "latitude": 33.29840528, "longitude": -111.71571314, "UPDATE_TS": "2020-05-02T08:38:33Z"}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "Device_Details", "ts_ms": 1588408713000, "snapshot": "false", "db": "device_details", "table": "tb_customermst", "server_id": 1, "gtid": "98557612-65ba-11ea-8dc4-000c29bcb2b4:6", "file": "binlog.000044", "pos": 4417, "row": 0, "thread": 8, "query": null}, "op": "c", "ts_ms": 1588408713761, "transaction": null}

Ключ Struct{customerid=10001}, и мне нужен ключ 10001.

Как я могу добиться этого...

При использовании протокола подключения SMT ValueToKey и ExtractField$Key возникает следующая ошибка:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:315)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
    at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

Подскажите, пожалуйста, что делать, чтобы получить ключ 10001.

Заранее спасибо.

PS: я использую... Confluent Platform 5.4.0....Debezium Connector for MySql 1.1.0


person bismi    schedule 02.05.2020    source источник


Ответы (1)


Ты почти там. Вам нужно использовать преобразование ExtractField$Key само по себе (то есть без ValueToKey), чтобы поднять значение из структуры.

"transforms":"extractKeyfromStruct",
"transforms.extractKeyfromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyfromStruct.field":"customerid",
person Robin Moffatt    schedule 02.05.2020
comment
Само по себе непонятно..... Вы имеете в виду, что в конфигурации соединителя мне нужно использовать {ExtractField$Key} без ValueToKey SMT.... - person bismi; 02.05.2020
comment
По-прежнему возникает та же ошибка, что и раньше, но на этот раз для ExtractField Caused by: java.lang.NullPointerException at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) - person bismi; 02.05.2020
comment
Как я могу сделать это со всеми таблицами только с одним подключением? - person rigobcastro; 21.05.2021