Kafka Streams: действие на n-м событии

Я пытаюсь найти лучший способ выполнить действие с n -м событием в Kafka Streams.

Мой случай: у меня есть входной поток с некоторыми событиями. Мне нужно отфильтровать их по eventType == login и для каждого n -го входа (скажем, пятого) для одного и того же accountId отправить это Событие в выходной поток.

После некоторого расследования и различных попыток у меня есть версия кода ниже (я использую Kotlin).

data class Event(
    val payload: Any = {},
    val accountId: String,
    val eventType: String = ""
)
// intermediate class to keep the key and value of the original event
data class LoginEvent(
    val eventKey: String,
    val eventValue: Event
)
fun process() {
        val userLoginsStoreBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("logins"),
            Serdes.String(),
            Serdes.Integer()
        )
        val streamsBuilder = StreamsBuilder().addStateStore(userCheckInsStoreBuilder)
        val inputStream = streamsBuilder.stream<String, String>(inputTopic)

        inputStream.map { key, event ->
            KeyValue(key, json.readValue<Event>(event))
        }.filter { _, event -> event.eventType == "login" }
             .map { key, event -> KeyValue(event.accountId, LoginEvent(key, event)) }
             .transform(
                    UserLoginsTransformer("logins", 5),
                    "logins"
                )
             .filter { _, value -> value }
             .map { key, _ -> KeyValue(key.eventKey, json.writeValueAsString(key.eventValue)) }
             .to("fifth_login", Produced.with(Serdes.String(), Serdes.String()))

        ...
    }
class UserLoginsTransformer(private val storeName: String, private val loginsThreshold: Int = 5) :
    TransformerSupplier<String, CheckInEvent, KeyValue< LoginEvent, Boolean>> {

    override fun get(): Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
        return object : Transformer<String, LoginEvent, KeyValue< LoginEvent, Boolean>> {
            private lateinit var store: KeyValueStore<String, Int>

            @Suppress("UNCHECKED_CAST")
            override fun init(context: ProcessorContext) {
                store = context.getStateStore(storeName) as KeyValueStore<String, Int>
            }

            override fun transform(key: String, value: LoginEvent): KeyValue< LoginEvent, Boolean> {
                val counter = (store.get(key) ?: 0) + 1
                return if (counter == loginsThreshold) {
                    store.delete(key)
                    KeyValue(value, true)
                } else {
                    store.put(key, counter)
                    KeyValue(value, false)
                }
            }

            override fun close() {
            }
        }
    }
}

Меня больше всего беспокоит то, что функция transform в моем случае не является поточно-ориентированной. Я проверил реализацию KV-хранилища, которое используется в моем случае, и это хранилище RocksDB (не транзакционное), поэтому значение может обновляться между чтением и сравнением, и на выход будет отправлено неправильное событие.

Другие мои идеи:

  1. Используйте материализованные представления как магазин без преобразователя, но я застрял на реализации.
  2. Создайте настраиваемое постоянное хранилище KV, которое будет использовать TransactionalRocksDB (не уверен, стоит ли оно того).
  3. Создайте настраиваемое постоянное хранилище KV, которое будет использовать внутри ConcurrentHashMap (это может привести к высокому потреблению памяти в случае большого количества пользователей, которых мы ожидаем).

Еще одно замечание: я использую Spring Cloud Stream, поэтому, возможно, в этой структуре есть встроенное решение для моего случая, но я его не нашел.

Буду признателен за любые предложения. Заранее спасибо.




Ответы (1)


Меня больше всего беспокоит то, что в моем случае функция преобразования не является потокобезопасной. Я проверил реализацию KV-хранилища, которое используется в моем случае, и это хранилище RocksDB (не транзакционное), поэтому значение может обновляться между чтением и сравнением, и на выход будет отправлено неправильное событие.

Нет причин для беспокойства. Если вы работаете с несколькими потоками, каждый поток будет иметь свою собственную базу данных RocksDB, в которой будет храниться один сегмент общих данных (обратите внимание, что общее состояние сегментировано на основе разделов входной темы, и один сегмент никогда не обрабатывается разными потоками). Следовательно, ваш код будет работать правильно. Единственное, что вам нужно убедиться, это то, что данные разделены по accountId, так что события входа в систему для одной учетной записи отправляются в один и тот же сегмент.

Если вы вводите данные, которые уже разделены accountId при записи в вашу тему ввода, вам не нужно ничего делать. Если нет, и вы можете управлять вышестоящим приложением, может быть проще всего использовать настраиваемый разделитель в вышестоящем разработчике приложения, чтобы получить нужное вам разделение. Если вы не можете изменить вышестоящее приложение, вам нужно будет заново разделить данные после того, как вы установили accountId в качестве нового ключа, т. Е. Выполнив through() перед вызовом transform().

person Matthias J. Sax    schedule 08.12.2019
comment
Я не могу контролировать восходящий поток, поэтому мне нужно перераспределение. Еще одно замечание - мы говорим не о нескольких потоках (потоках JVM), а о нескольких потребителях, которые читают одну и ту же тему. - person ninja; 11.12.2019
comment
Разные потребители будут всегда работать в разных потоках, используя Kafka Streams. - person Matthias J. Sax; 11.12.2019