Как агрегировать KStream в список фиксированного размера?

Аналогично, но немного иначе, чем этот вопрос: окна пакетной обработки KStream, я хочу группировать сообщения из KStream перед тем, как отправить его потребителям.

Однако это нажатие должно быть запланировано не на фиксированное временное окно, а на фиксированное пороговое значение количества сообщений для каждого ключа.

Для начала приходят в голову 2 вопроса:

1) Является ли пользовательский AbstractProcessor способом обработки этого? Что-то вроде:

@Override
public void punctuate(long streamTime) {
    KeyValueIterator<String, Message[]> it = messageStore.all();
    while (it.hasNext()) 
        KeyValue<String, Message[]> entry = it.next();
        if (entry.value.length > 10) {
            this.context.forward(entry.key, entry.value);
            entry.value = new Message[10]();
        }
    }
}

2) Поскольку StateStore потенциально может взорваться (в случае, если значение записи никогда не достигнет порогового значения для пересылки), каков наилучший способ «сбора мусора»? Я мог бы составить расписание на основе времени и удалить слишком старые ключи... но это выглядит очень самодельным и подверженным ошибкам.


person Raf    schedule 02.12.2016    source источник


Ответы (1)


Думаю, это сработает. Применение «сборки мусора» на основе времени тоже звучит разумно. И да, использование Processor API вместо DSL имеет некоторый оттенок DIY - это не цель PAPI в первую очередь (предоставить пользователю возможность делать все, что необходимо).

Однако несколько комментариев:

  • Вам понадобится более сложная структура данных: поскольку punctuate() вызывается на основе прогресса во время потока, может случиться так, что у вас будет более 10 записей для одного ключа между двумя вызовами. Таким образом, вам понадобится что-то вроде KeyValueIterator<String, List<Message[]>> it = messageStore.all();, чтобы иметь возможность хранить несколько пакетов для каждого ключа.
  • Я предполагаю, что вам нужно будет точно настроить расписание для пунктуации, что будет сложно - если ваше расписание слишком плотное, многие пакеты могут быть еще не завершены, и вы тратите ресурсы ЦП - если ваше расписание слишком свободное, вам понадобится много памяти, и ваши нижестоящие операторы получат много данных, поскольку вы одновременно выдаете много материала. Отправка пакета данных вниз по течению может стать проблемой.
  • Сканирование всего хранилища стоит дорого — кажется хорошей идеей попытаться «сортировать» пары «ключ-значение» в соответствии с размером их пакета. Это должно позволить вам касаться только тех клавиш, у которых есть завершенные пакеты, а не всех клавиш. Возможно, вы можете хранить в памяти список ключей, для которых были завершены пакеты, и выполнять поиск только для них (в случае неудачи вам нужно выполнить один проход по всем ключам из хранилища, чтобы воссоздать этот список в памяти).
person Matthias J. Sax    schedule 02.12.2016
comment
Спасибо за эти ценные комментарии. Более конкретные вопросы последуют после реализации, без сомнения. - person Raf; 03.12.2016