Как использовать Apache Flink CEP SQL для получения событий из уже согласованного шаблона?

Мое требование - создать триггер на основе двух событий (EVT_A и EVT_B независимо от порядка). Вот ожидание

1. EVT_A arrived. --> No action
2. EVT_B arrived  --> Should Trigger
3. EVT_B arrived  --> should Trigger since A was received previously (o/p should include A and current B)
4. EVT_A arrived  --> should Trigger since B was received previously (o/p should include current A and last B)
5. EVT_A arrived  --> should Trigger since B was received previously (o/p should include current A and last B)

Я пробовал подписаться, но безуспешно.

SELECT E.*
From MyEvents
MATCH_RECOGNIZE (
    ORDER BY procTime
    MEASURES ARRAY[
        Event(A.id, A.name, A.date),
        Event(B.id, B.name, B.date)
    ] AS Events
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN (A C* B)
    DEFINE
        A AS name in ('EVT_A', 'EVT_B'),
        B AS name in ('EVT_A', 'EVT_B') AND B.name <> A.name,
        C AS name not in ('EVT_A', 'EVT_B')
) AS E;

Я также пробовал с ПОСЛЕ МАТЧА ПЕРЕЙТИ К ПЕРВОМУ A. Но это тоже одно из исключений. Любые предложения, как я могу добиться этого с помощью Flink SQL CEP или любым другим способом в Flink.


person ParagM    schedule 03.09.2020    source источник


Ответы (1)


Кажется, это тот случай, когда RichFlatMap или ProcessFunction - самый простой способ. Просто нужно немного состояния:

ValueState<Event> lastA;
ValueState<Event> lastB;

И тогда логика обработки каждого входящего события примерно такая:

if EVT_A
  store event as lastA
  emit lastB with this event unless lastB is null
if EVT_B
  store event as lastB
  emit lastA with this event unless lastA is null

Если вы ищете введение в работу с управляемым состоянием Flink, есть учебник в документации.

person David Anderson    schedule 03.09.2020
comment
Спасибо, Дэвид, за ваш ответ. В моем фактическом бизнес-сценарии я собираюсь динамически получать SQL (скорее, условие срабатывания) от другой службы. поэтому я искал варианты, доступные с CEP SQL или обычным SQL. Кроме того, я могу получить условие срабатывания от 2 до 15 событий. Есть ли способ справиться с этим динамически? - person ParagM; 09.09.2020