У меня проблема с тем, как ведет себя следующая топология:
String topic = config.topic();
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
.peek((k, v) -> L.info("Event:"+v.action))
// join the event with the according entry in the KTable and apply the state mutation
.leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
.peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
Моя проблема возникает, когда я получаю разные события одновременно. Поскольку моя мутация состояния выполняется leftJoin
, а затем записывается методом to
. У меня может произойти следующее, если события 1 и 2 получены одновременно с одним и тем же ключом:
event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic
Из-за этого в состоянии Y нет изменений с event1
, поэтому я потерял данные.
Вот что я вижу с точки зрения журналов (часть Processing:...
регистрируется внутри средства объединения значений):
Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1
Event1
можно рассматривать как событие создания: он создаст запись в KTable, поэтому не имеет значения, является ли состояние пустым. Event2
хотя необходимо применить изменения к существующему состоянию, но он не находит их, потому что первая мутация состояния все еще не была записана в KTable (она все еще не была обработана методом to
)
Есть ли способ убедиться, что мой leftJoin и мои записи в ktable выполняются атомарно?
Спасибо
Обновление и текущее решение
Благодаря ответу @Matthias я смог найти решение, используя Transformer
.
Вот как выглядит код:
Это трансформатор
public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {
private final String stateName;
private final ValueJoiner<V1, V2, V2> joiner;
private final boolean updateState;
private KeyValueStore<K, V2> state;
public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
this.stateName = stateName;
this.joiner = joiner;
this.updateState = updateState;
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
}
@Override
public KeyValue<K, V2> transform(K key, V1 value) {
V2 stateValue = this.state.get(key); // Get current state
V2 updatedValue = joiner.apply(value, stateValue); // Apply join
if (updateState) {
this.state.put(key, updatedValue); // write new state
}
return new KeyValue<>(key, updatedValue);
}
@Override
public KeyValue<K, V2> punctuate(long timestamp) {
return null;
}
@Override
public void close() {}
}
А вот адаптированная топология:
String topic = config.topic();
String store = topic + "-store";
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);
// Receive a stream of various events
topology.eventsStream()
// Only process events that are implementing MyEvent
.filter((k, v) -> v instanceof MyEvent)
// Cast to ease the code
.mapValues(v -> (MyEvent) v)
// rekey by data id
.selectKey((k, v) -> v.data.id)
// join the event with the according entry in the KTable and apply the state mutation
.transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
// write the updated state to the KTable.
.to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
Поскольку мы используем KV StateStore KTable и применяем изменения непосредственно в нем через put
события метода, shoudl всегда выбирает обновленное состояние. Одна вещь, которую я до сих пор не понимаю: что, если у меня будет постоянная высокая пропускная способность событий.
Может ли все еще существовать состояние гонки между помещениями, которые мы выполняем в хранилище KTable KTable, и записями, которые выполняются в теме KTable?