Я пытаюсь найти лучший способ выполнить действие с n -м событием в Kafka Streams.
Мой случай: у меня есть входной поток с некоторыми событиями. Мне нужно отфильтровать их по eventType == login и для каждого n -го входа (скажем, пятого) для одного и того же accountId отправить это Событие в выходной поток.
После некоторого расследования и различных попыток у меня есть версия кода ниже (я использую Kotlin).
data class Event(
val payload: Any = {},
val accountId: String,
val eventType: String = ""
)
// intermediate class to keep the key and value of the original event
data class LoginEvent(
val eventKey: String,
val eventValue: Event
)
fun process() {
val userLoginsStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("logins"),
Serdes.String(),
Serdes.Integer()
)
val streamsBuilder = StreamsBuilder().addStateStore(userCheckInsStoreBuilder)
val inputStream = streamsBuilder.stream<String, String>(inputTopic)
inputStream.map { key, event ->
KeyValue(key, json.readValue<Event>(event))
}.filter { _, event -> event.eventType == "login" }
.map { key, event -> KeyValue(event.accountId, LoginEvent(key, event)) }
.transform(
UserLoginsTransformer("logins", 5),
"logins"
)
.filter { _, value -> value }
.map { key, _ -> KeyValue(key.eventKey, json.writeValueAsString(key.eventValue)) }
.to("fifth_login", Produced.with(Serdes.String(), Serdes.String()))
...
}
class UserLoginsTransformer(private val storeName: String, private val loginsThreshold: Int = 5) :
TransformerSupplier<String, CheckInEvent, KeyValue< LoginEvent, Boolean>> {
override fun get(): Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
return object : Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
private lateinit var store: KeyValueStore<String, Int>
@Suppress("UNCHECKED_CAST")
override fun init(context: ProcessorContext) {
store = context.getStateStore(storeName) as KeyValueStore<String, Int>
}
override fun transform(key: String, value: LoginEvent): KeyValue< LoginEvent, Boolean> {
val counter = (store.get(key) ?: 0) + 1
return if (counter == loginsThreshold) {
store.delete(key)
KeyValue(value, true)
} else {
store.put(key, counter)
KeyValue(value, false)
}
}
override fun close() {
}
}
}
}
Меня больше всего беспокоит то, что функция transform
в моем случае не является поточно-ориентированной. Я проверил реализацию KV-хранилища, которое используется в моем случае, и это хранилище RocksDB (не транзакционное), поэтому значение может обновляться между чтением и сравнением, и на выход будет отправлено неправильное событие.
Другие мои идеи:
- Используйте материализованные представления как магазин без преобразователя, но я застрял на реализации.
- Создайте настраиваемое постоянное хранилище KV, которое будет использовать TransactionalRocksDB (не уверен, стоит ли оно того).
- Создайте настраиваемое постоянное хранилище KV, которое будет использовать внутри ConcurrentHashMap (это может привести к высокому потреблению памяти в случае большого количества пользователей, которых мы ожидаем).
Еще одно замечание: я использую Spring Cloud Stream, поэтому, возможно, в этой структуре есть встроенное решение для моего случая, но я его не нашел.
Буду признателен за любые предложения. Заранее спасибо.