Не удается сохранить агрегированную таблицу KSQL в Postgres

Я пытаюсь отразить таблицу KSQL в моей базе данных Postgres, используя коннектор приемника JDBC, но, к сожалению, я не могу заставить ее работать.

Я использую Kafka 5.4.1, и у меня есть 2 темы debezium 1.0, сериализованные с помощью Avro из моей базы данных Postgres. Это конфигурация моего коннектора Debezium:

    {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "xxx",
        "tasks.max": "1",
        "database.history.kafka.bootstrap.servers": "kafka-svc:9092",
        "database.history.kafka.topic": "dbhistory.xxx",
        "database.server.name": "xxx",
        "database.port": "5432",
        "plugin.name": "decoderbufs",
        "table.whitelist": "public.a,public.b",
        "database.hostname": "app-db",
        "name": "connector",
        "connection.url": "jdbc:postgresql://app-db:5432/xxx",
        "database.whitelist": "xxx",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.source.fields": "table"
    }

Затем я использую KSQL CLI для взаимодействия с моим сервером и выполняю следующие команды:

CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');

CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');

CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;

CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;

TL; DR, я создаю 2 потока из тем debezium и перераспределяю их, чтобы подготовить их к ПРИСОЕДИНЕНИЮ. Затем я превращаю один из них (b_by_id) в таблицу, потому что в этом случае я не хочу использовать оконное соединение:

CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');

На данный момент все работает нормально, и я могу поиграть со своими потоками, таблицами и объединениями и увидеть, что изменения в моей исходной БД немедленно отражаются в моих потоковых запросах в KSQL. Моя проблема возникает, когда я решаю выполнить некоторую агрегатную функцию для своих данных и отразить результаты в моей базе данных Postgres (такой же, как исходная база данных). Для этого я создаю новую таблицу KSQL в результате SELECT:

CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;

Затем я настроил соединитель приемника JDBC, чтобы выгрузить тему журнала изменений grouped_data моей новой таблицы в мою БД со следующей конфигурацией:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://app-db:5432/xxx",
    "insert.mode": "upsert",
    "auto.create": true,
    "auto.evolve": true,
    "topics": "grouped_data",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-svc:8081",
    "pk.mode": "record_value",
    "pk.fields": "x, y, z",
    "table.name.format" : "kafka_${topic}",
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.field": "max_date",
    "transforms.TimestampConverter.target.type": "Timestamp"
}

К сожалению, ничего не происходит, ошибок и данных в моей БД приемника нет. Соединитель правильно создан и настроен, и даже если я принудительно обрабатываю новые сообщения с помощью моих потоковых запросов, данные не передаются в мою базу данных приемника, таблица назначения даже не создается. Я несколько раз пытался создать коннектор с разными именами и конфигурациями, разными значениями для pk.mode и т. Д., Но у меня не получалось заставить его работать. Создание коннектора для моей таблицы «b» выше работает отлично, и все данные передаются немедленно.

Вот дополнительные сведения о таблице KSQL, которую я пытаюсь отразить в postgres:

describe extended grouped_data;

Name                 : GROUPED_DATA
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

 Field              | Type                      
------------------------------------------------
 ROWTIME            | BIGINT           (system) 
 ROWKEY             | VARCHAR(STRING)  (system) 
 X                  | BIGINT                    
 Y                  | BIGINT                    
 Z                  | BIGINT                    
 MAX_DATE           | BIGINT                    
------------------------------------------------

Спасибо!


person Roberto Francescangeli    schedule 24.03.2020    source источник
comment
Престижность за хорошо написанный вопрос с ясными примерами, версией и т. Д. ????   -  person Robin Moffatt    schedule 26.03.2020
comment
Спасибо, @RobinMoffatt!   -  person Roberto Francescangeli    schedule 26.03.2020


Ответы (1)


Вы настроили Kafka Connect на использование имени темы в нижнем регистре.

"topics": "grouped_data",

но в соответствии с вашим DESCRIBE выводом тема, в которую записывается таблица, находится в верхнем регистре:

Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

Если вы внимательно посмотрите журнал работника Kafka Connect, вы обнаружите следующее:

Error while fetching metadata with correlation id 2 : {grouped_data=LEADER_NOT_AVAILABLE} 

Kafka Connect не прервется, если вы укажете тему, которой не существует - возможно, это та тема, которую вы хотели указать, потому что впоследствии вы собираетесь ее заполнить.

Итак, вы можете либо изменить конфигурацию рабочего процесса Kafka Connect, чтобы использовать имя темы в верхнем регистре, либо переопределить таблицу ksqlDB и включить …WITH (KAFKA_TOPIC='grouped_data') в DDL.

person Robin Moffatt    schedule 26.03.2020
comment
Сработало (частично)! Теперь Kafka Connect получает правильные метаданные темы от Kafka, но я не вижу ни таблицу в своей целевой БД, ни какие-либо передаваемые данные, даже если я отправляю сообщения через тему GROUPED_DATA, изменяя мою таблицу a в исходной БД. - person Roberto Francescangeli; 26.03.2020
comment
Хорошо, я выяснил последний недостающий бит. Если вы используете Postgres с потоками в верхнем регистре KSQL, обязательно используйте кавычки при запросе целевой БД! В моем случае моя таблица была создана Kafka Connect с сочетанием верхнего и нижнего регистра: kafka_GROUPED_TABLE. Чтобы запросить эту таблицу, вам нужно сделать что-то вроде: SELECT * FROM kafka_GROUPED_TABLE; . TL; DR не забывайте кавычки в вашем SQL-запросе или изменяйте определение таблицы KSQL, как это было предложено Робином выше (.... WITH (KAFKA_TOPIC = 'grouped_data')) - person Roberto Francescangeli; 26.03.2020