Запись соединения KStream-KTable в KTable: как синхронизировать соединение с записью ktable?

У меня проблема с тем, как ведет себя следующая топология:

String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    .peek((k, v) -> L.info("Event:"+v.action))
    // join the event with the according entry in the KTable and apply the state mutation
    .leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
    .peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

Моя проблема возникает, когда я получаю разные события одновременно. Поскольку моя мутация состояния выполняется leftJoin, а затем записывается методом to. У меня может произойти следующее, если события 1 и 2 получены одновременно с одним и тем же ключом:

event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic

Из-за этого в состоянии Y нет изменений с event1, поэтому я потерял данные.

Вот что я вижу с точки зрения журналов (часть Processing:... регистрируется внутри средства объединения значений):

Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1

Event1 можно рассматривать как событие создания: он создаст запись в KTable, поэтому не имеет значения, является ли состояние пустым. Event2 хотя необходимо применить изменения к существующему состоянию, но он не находит их, потому что первая мутация состояния все еще не была записана в KTable (она все еще не была обработана методом to)

Есть ли способ убедиться, что мой leftJoin и мои записи в ktable выполняются атомарно?

Спасибо

Обновление и текущее решение

Благодаря ответу @Matthias я смог найти решение, используя Transformer.

Вот как выглядит код:

Это трансформатор

public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {

    private final String                    stateName;
    private final ValueJoiner<V1, V2, V2>   joiner;
    private final boolean                   updateState;

    private KeyValueStore<K, V2>            state;

    public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
        this.stateName = stateName;
        this.joiner = joiner;
        this.updateState = updateState;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
    }

    @Override
    public KeyValue<K, V2> transform(K key, V1 value) {
        V2 stateValue = this.state.get(key); // Get current state
        V2 updatedValue = joiner.apply(value, stateValue); // Apply join
        if (updateState) {
            this.state.put(key, updatedValue); // write new state
        }
        return new KeyValue<>(key, updatedValue);
    }

    @Override
    public KeyValue<K, V2> punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {}
}

А вот адаптированная топология:

String topic = config.topic();
String store = topic + "-store";

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    // join the event with the according entry in the KTable and apply the state mutation
    .transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

Поскольку мы используем KV StateStore KTable и применяем изменения непосредственно в нем через put события метода, shoudl всегда выбирает обновленное состояние. Одна вещь, которую я до сих пор не понимаю: что, если у меня будет постоянная высокая пропускная способность событий.

Может ли все еще существовать состояние гонки между помещениями, которые мы выполняем в хранилище KTable KTable, и записями, которые выполняются в теме KTable?


person Crystark    schedule 14.09.2017    source источник


Ответы (1)


KTable разделяется на несколько физических хранилищ, и каждое хранилище обновляется только одним потоком. Таким образом, описанный вами сценарий не может произойти. Если у вас есть 2 записи с одинаковой меткой времени, которые обновляют один и тот же сегмент, они будут обрабатываться одна за другой (в порядке смещения). Таким образом, второе обновление будет видеть состояние после первого обновления.

Так, может быть, вы просто неправильно описали свой сценарий?

Обновить

Вы не можете изменить состояние при объединении. Таким образом, ожидание того, что

event1 joins with state A => state A mutated to state X

неправильно. Независимо от порядка обработки, когда event1 соединяется с state A, он будет обращаться к state A в режиме только для чтения, и state A не будет изменен.

Таким образом, когда event2 присоединяется, он будет видеть то же состояние, что и event1. Для объединения потоковой таблицы состояние таблицы обновляется только тогда, когда новые данные считываются из раздела table-input-topic.

Если вы хотите иметь общее состояние, которое обновляется с обоих входов, вам нужно будет создать собственное решение, используя transform():

builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");

Это создаст одно хранилище, которое используется обоими процессорами, и оба могут читать / писать по своему усмотрению. Таким образом, для входной таблицы вы можете просто обновить состояние, не отправляя ничего в нисходящем направлении, в то время как для входного потока вы можете выполнить соединение, обновить состояние и отправить результат в нисходящий поток.

Обновление 2

Что касается решения, то между обновлениями не будет состояния гонки, которое Transformer применяет к состоянию и записывает Transformer процессы после обновления состояния. Эта часть будет выполняться в одном потоке, а записи будут обрабатываться в порядке смещения от входной темы. Таким образом, гарантируется, что обновление состояния будет доступно для последующих записей.

person Matthias J. Sax    schedule 14.09.2017
comment
Я добавил несколько журналов, чтобы проиллюстрировать мою проблему. может это понятнее? - person Crystark; 14.09.2017
comment
Я не уверен на 100%, понимаю ли я журнал. У вас есть два типа событий? Одни для заполнения таблицы, а другие - для потока (если да, то какие)? Где вы регистрируете Updated и что означает -null-null? Также обратите внимание, что при объединении потока и таблицы состояние таблицы обновляется только из раздела ввода таблицы, а поток только читает из таблицы (он никогда не обновляет таблицу). - person Matthias J. Sax; 14.09.2017
comment
Я обновил свою проблему, надеясь, что она проясняет ситуацию. Обновленная часть регистрируется сразу после leftJoin. И да, у меня бывает несколько типов событий. Все сделано в режиме Event Sourced. - person Crystark; 14.09.2017
comment
Обновил свой ответ. - person Matthias J. Sax; 15.09.2017
comment
Под мутацией я имел в виду то, что мой ValueJoiner отвечает за компиляцию нового состояния. Я понимаю, что он не изменяет его сразу, но на самом деле я хотел бы сделать это: читать-изменять-писать из / в мою ktable атомарным способом. Насколько я понимаю, вы создаете третье состояние, но мне нужно, чтобы данные заканчивались в table-topic. Так что, может быть, все, что мне нужно, это builder.addStore(..., "table-topic"); builder.stream("stream-topic").transform(..., "table-topic");, чтобы трансформатор имел доступ table-topic для чтения? Имеет ли это смысл ? Я не знаком с API преобразования / процесса - person Crystark; 15.09.2017
comment
Думаю, я мог бы получить что-нибудь удовлетворяющее. У меня больше нет проблемы. Теперь просто интересно, может ли там быть более редкое состояние гонки. Буду рад услышать ваше мнение по вопросу, который я добавил в конце моего обновления. - person Crystark; 15.09.2017
comment
Как уже упоминалось, ValueJoiner не имеет доступа на запись. Ваш подход кажется осуществимым (он похож на мое предложение, но на самом деле лучше, если вы сохраните код). Я обновил свой ответ относительно вашего последнего вопроса. - person Matthias J. Sax; 15.09.2017
comment
@ MatthiasJ.Sax могу ли я добиться такой же атомарной «записи» в GlobalKTable? У меня аналогичный случай с событиями и обновлениями состояния, но состояние не помещается в одно сообщение kafka, поэтому я рассматриваю GlobalKTable - person Артём zэ Капитан; 24.09.2020
comment
Не знаете, что вы имеете в виду под атомарной записью? GlobalKTable предназначен для чтения раздела журнала изменений и применения обновлений запись за записью. - person Matthias J. Sax; 25.09.2020