Я хочу разбить этот поток на сеансы: 1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5,... на эти сеансы:
1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5
Я написал CustomTrigger, чтобы определять, когда элементы потока изменяются с 1 на 2 (с 2 на 3, с 3 на 0 и т. д.), а затем запускать триггер. Но это не решение, потому что, когда я обрабатываю первый элемент из 2 и запускаю триггер, окно будет [1,1,1,2], но мне нужно запустить триггер для последнего элемента из 1.
Вот песудо моей функции onElement в моем пользовательском классе триггера:
override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (prevState == element.value) {
prevState = element.value
TriggerResult.CONTINUE
} else {
prevState = element.value
TriggerResult.FIRE
}
}
Как я могу решить эту проблему?