Flink SQL (V 1.12.1) не может прочитать журнал изменений debezium из потока Kinesis

У меня возникли проблемы с чтением журнала изменений Debezium из потока Kinesis. Могу ли я получить представление о том, как я могу анализировать события журнала изменений с помощью Flink SQL.

Ниже представлена ​​моя попытка проанализировать поток через клиент Flink SQL.

Flink SQL> CREATE TABLE test_table (
>   city_id INT,
>   country_id INT,
>   city STRING,
>   last_update timestamp
> )
> WITH (
>   'connector' = 'kinesis',
>   'stream' = 'kinesis.sakila.city',
>   'aws.region' = 'us-east-1',
>   'scan.stream.initpos' = 'TRIM_HORIZON',
>   'format' = 'debezium-json'
> );
[INFO] Table has been created.

Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema


person bk123456    schedule 22.02.2021    source источник


Ответы (1)


В документации Flink есть таблица, которая показывает какие коннекторы поддерживают каждый из форматов. Вы увидите, что формат журнала изменений Debezium не поддерживается коннектором Kinesis.

person David Anderson    schedule 22.02.2021
comment
Большое спасибо, Дэвид. Наверное, мне следовало глубже покопаться в документации. Вы знаете, есть ли в ближайшее время какие-либо планы по поддержке формата журнала изменений для Kinesis? - person bk123456; 22.02.2021
comment
Я не знаю, каковы планы и в чем дело. Вы можете спросить в списке рассылки пользователей. - person David Anderson; 23.02.2021
comment
Вы можете повысить свой голос в issues.apache.org/jira/browse/FLINK-20060, чтобы получить более высокий приоритет для этой проблемы. - person twalthr; 23.02.2021