Аналогично, но немного иначе, чем этот вопрос: окна пакетной обработки KStream, я хочу группировать сообщения из KStream
перед тем, как отправить его потребителям.
Однако это нажатие должно быть запланировано не на фиксированное временное окно, а на фиксированное пороговое значение количества сообщений для каждого ключа.
Для начала приходят в голову 2 вопроса:
1) Является ли пользовательский AbstractProcessor
способом обработки этого? Что-то вроде:
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, Message[]> it = messageStore.all();
while (it.hasNext())
KeyValue<String, Message[]> entry = it.next();
if (entry.value.length > 10) {
this.context.forward(entry.key, entry.value);
entry.value = new Message[10]();
}
}
}
2) Поскольку StateStore
потенциально может взорваться (в случае, если значение записи никогда не достигнет порогового значения для пересылки), каков наилучший способ «сбора мусора»? Я мог бы составить расписание на основе времени и удалить слишком старые ключи... но это выглядит очень самодельным и подверженным ошибкам.