Как выразить логику дедупликации событий в потоковой обработке Сиддхи

Привет: Мне нужно реализовать следующую логику дедупликации в потоковой обработке Сиддхи. Предположим, у меня есть InputStream, и я хочу создать OutputStream следующим образом:

(1) когда событие является первым (с момента запуска механизма обработки событий) в InputStream, вставьте событие в OutputStream.

(2) если событие с той же сигнатурой, например, с тем же именем, приходит в течение 2-минутных окон, мы считаем, что событие идентично, и НЕ должны вставлять событие в OutputStream. В противном случае мы должны вставить событие в OutputStream.

Я попытался использовать шаблон событий для фильтрации. Однако я не могу найти, что могу выразить «логику отрицания» на сиддхи, то есть, если (не (e1 -> e2 с той же подписью в 2-минутном окне)). Есть ли умный способ реализовать такую ​​логику дедупликации событий? Обратите внимание, что дедупликация событий - очень распространенное выражение, необходимое для обработки событий.

Если бы я реализовал это на Java, это было бы относительно просто. Я создам хеш-таблицу. Когда приходит первое событие, я регистрирую его в хэш-состоянии и устанавливаю время приема этого зарегистрированного события на 2 минуты позже. Когда приходит следующее событие, я просматриваю хэш-таблицу и сравниваю допустимое время извлеченного события с моим текущим временем события, и если текущее время события меньше допустимого времени, я не буду рассматривать его как выходное событие. Вместо реализации Java я предпочитаю реализовать декларативное решение в запросе обработки потока Siddhi, если это возможно.


person Jun Li    schedule 14.06.2018    source источник


Ответы (1)


Вы можете использовать таблицу в памяти и добиться этого; Пожалуйста, найдите образец ниже; он очень похож на ваш подход к Java.

    define stream InputStream (event_id string, data string);  
    define stream OutputStream (event_id string, data string);  
    define table ProcessedEvents (event_id string);

    from InputStream[not(ProcessedEvents.event_id == event_id in ProcessedEvents)]  
    insert into OutputStream ; 

    from OutputStream  
    select event_id  
    insert into ProcessedEvents ; 

    from OutputStream#window.time(2 sec)  
    select event_id  
    insert expired events into PurgeStream ; 

    from PurgeStream  
    delete ProcessedEvents  
       on ProcessedEvents.event_id == event_id ;
person Grainier    schedule 15.06.2018