У меня возникли проблемы с чтением журнала изменений Debezium из потока Kinesis. Могу ли я получить представление о том, как я могу анализировать события журнала изменений с помощью Flink SQL.
Ниже представлена моя попытка проанализировать поток через клиент Flink SQL.
Flink SQL> CREATE TABLE test_table (
> city_id INT,
> country_id INT,
> city STRING,
> last_update timestamp
> )
> WITH (
> 'connector' = 'kinesis',
> 'stream' = 'kinesis.sakila.city',
> 'aws.region' = 'us-east-1',
> 'scan.stream.initpos' = 'TRIM_HORIZON',
> 'format' = 'debezium-json'
> );
[INFO] Table has been created.
Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema