Как сессионизировать поток с помощью Apache Flink?

Я хочу разбить этот поток на сеансы: 1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,5,... на эти сеансы:

1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5

Я написал CustomTrigger, чтобы определять, когда элементы потока изменяются с 1 на 2 (с 2 на 3, с 3 на 0 и т. д.), а затем запускать триггер. Но это не решение, потому что, когда я обрабатываю первый элемент из 2 и запускаю триггер, окно будет [1,1,1,2], но мне нужно запустить триггер для последнего элемента из 1.

Вот песудо моей функции onElement в моем пользовательском классе триггера:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if (prevState == element.value) {
      prevState = element.value
      TriggerResult.CONTINUE
    } else {
      prevState = element.value
      TriggerResult.FIRE
    }
}

Как я могу решить эту проблему?


person Milad Khajavi    schedule 18.06.2017    source источник


Ответы (1)


Я думаю, что FlatMapFunction с ListState — это самый простой способ реализовать этот вариант использования.

Когда поступает новый элемент (т. е. вызывается метод flatMap()), вы проверяете, не изменилось ли значение. Если значение не изменилось, вы добавляете элемент в состояние. Если значение изменилось, вы выдаете текущее состояние списка как сеанс, очищаете список и вставляете новый элемент первым в состояние списка.

Однако вы должны иметь в виду, что это предполагает сохранение порядка элементов. Flink обеспечивает внутри раздела, т. е. до тех пор, пока элементы не перемешиваются и все операторы выполняются с одинаковым параллелизмом.

person Fabian Hueske    schedule 19.06.2017