Дублированные промежуточные результаты соединений KStream-KStream в Kafka Streams

У меня такой сценарий:

  1. Таблица A и Таблица B соединены с помощью FK.
  2. Транзакционная вставка / обновление как в A, так и в B.
  3. Debezium генерирует одно событие a для таблицы A и одно событие b для таблицы B.
  4. Kafka Streams создает KStream для таблиц A и B.
  5. Приложение Kafka Streams leftJoin KStreams A и B. (Предположим, что записи a и b имеют одинаковые ключи и попадают в окно соединения).
  6. Выходные записи будут [a, null], [a, b].

Как отказаться от [a, null]?

Можно выполнить innerJoin, но это все равно будет проблемой в случае update запросов.

Мы попытались использовать метку времени события для фильтрации (т.е. сохранить событие с последней меткой времени), но уникальность метки времени не гарантируется.

т.е. Конечная цель - иметь возможность идентифицировать последний агрегированный показатель, чтобы мы могли отфильтровывать промежуточные результаты во время запроса (либо в Athena / Presto, либо в какой-либо СУБД).


person Ashhar Hasan    schedule 10.05.2019    source источник
comment
Можете ли вы просто filter() после присоединения? Также обратите внимание, что поддержка соединений по внешнему ключу - это WIP atm: cwiki.apache.org/confluence/display/KAFKA/   -  person Matthias J. Sax    schedule 12.05.2019
comment
Я могу использовать filter, но не хочу полагаться на детали реализации, что вставка всегда будет транзакционной. Подумайте о двух последовательных вставках, это будут два события, и для соединения KStream-KStream они выведут 2 записи вместо одной, которую я хочу.   -  person Ashhar Hasan    schedule 13.05.2019
comment
@ MatthiasJ.Sax, в частности, я хочу выяснить, как идентифицировать последнее сообщение для каждого ключевого нисходящего потока при использовании чего-то вроде приемника S3. В настоящее время я добавляю поле для eventCreatedAt, но это, очевидно, то же самое (и не гарантируется, что оно будет увеличиваться) для событий в той же транзакции.   -  person Ashhar Hasan    schedule 13.05.2019
comment
Очень похоже на stackoverflow.com/questions/47495299/.   -  person Ashhar Hasan    schedule 13.05.2019


Ответы (1)


На данный момент лучший рабочий подход, который я нашел, - это использовать смещения Kafka из выходных записей.

Подход можно резюмировать следующим образом:

  1. Выполняйте всю логику, которую хотите, и не беспокойтесь о нескольких записях для одного и того же ключа.
  2. Запишите результаты в промежуточную тему с минимальным удержанием (1 час и т. Д.)
  3. Прочтите промежуточные темы с помощью процессора и внутри процессора обогатите сообщение смещением Kafka с помощью context.offset().
  4. Выпишите сообщения в тему вывода.

Теперь ваша тема вывода содержит несколько сообщений для одного и того же ключа, но каждое с разным смещением.

Теперь во время запроса вы можете выбрать максимальные смещения для каждого ключа с помощью подзапроса.

Пример TransformerSupplier можно увидеть ниже.

/**
 * @param <K> key type
 * @param <V> value type
 */
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> {
  @Override
  public Transformer<K, V, KeyValue<String, String>> get() {
    return new OutputTransformer<>();
  }

  private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
      this.context = context;
    }

    /**
     * @param key   the key for the record
     * @param value the value for the record
     */
    @Override
    public KeyValue<String, String> transform(K key, V value) {
      if (value != null) {
        value.setKafkaOffset(context.offset());
      }
      return new KeyValue<>(key, value);
    }

    @Override
    public KeyValue<String, String> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {
      // nothing to close
    }
  }
}
person Ashhar Hasan    schedule 13.05.2019