Правила агрегации Apache Storm для отсутствующих ожидаемых событий в скользящем периоде времени

Мой вариант использования — идентифицировать объекты, от которых ожидаемые события не были получены по прошествии X времени в режиме реального времени, а не с использованием пакетных заданий. Например:

Если мы получили событие PaymentInitiated в момент времени T, но не получили ни PaymentFailed, ни PaymentAborted, ни PaymentSucedded по T+X, то активируем триггер с сообщением PaymentStuck вместе с подробностями события PaymentIntitiated.

Как я могу смоделировать такие варианты использования в Apache Storm, поскольку для каждого события используется период времени X, а не фиксированный интервал времени.

Спасибо, Хариш




Ответы (1)


Для Storm потребуется поместить всю вашу логику в ваш код UDF, используя низкоуровневый Java API (я сомневаюсь, что Trindent полезен). Я никогда не работал с Samza и не могу помочь (или решить, какая система лучше подойдет для решения вашей проблемы).

Например, в Storm вы можете назначить временную метку каждому кортежу в Spout.nextTuple() и буферизовать все кортежи незавершенного платежа в Bolt в порядке убывания временной метки. Каждый раз, когда вызывается Bolt.execute(), вы можете сравнить отметку времени нового кортежа с начальным (т. е. самым старым кортежем) вашей очереди. Если входной кортеж имеет большую временную метку, чем head-T плюс X, вы знаете, что время ожидания вашего головного кортежа истекло, и вы можете поднять для него триггер.

Конечно, вам нужно выполнить fieldsGrouping(), чтобы убедиться, что все кортежи, принадлежащие одному и тому же платежу, обрабатываются одним и тем же экземпляром Bolt. Вам также может понадобиться несколько упорядочить входящие кортежи болтов по отметке времени или использовать более продвинутую логику тайм-аута для обработки неупорядоченных кортежей (в отношении увеличения отметок времени).

В зависимости от ваших требований к задержке и скорости входного потока вы также можете использовать «тиковые кортежи», чтобы инициировать сравнение головного кортежа с этим фиктивным кортежем тиков. Или, как еще более строгая реализация, проделайте всю эту логику прямо в Spout.next() (если вы знаете, что все кортежи платежа проходят через один и тот же экземпляр Spout).

person Matthias J. Sax    schedule 27.09.2015
comment
Спасибо за ваш ответ. В приведенном выше подходе проверка метки времени выполняется только при вызове Bolt.execute(), что будет выполняться только для события. Но мое требование - поднять триггер, даже если желаемое событие отсутствует... Как справиться с таким сценарием? Я что-то неправильно понял? - person Harish; 28.09.2015
comment
Ты понял. Идея состоит в том, чтобы активировать триггер, когда приходит другое событие, которое моложе (т. е. после потенциального последнего события платежа). Если у вас есть кортежи, упорядоченные по отметке времени, вы знаете, что последнее событие действительно реагировать косвенно). Вы никогда не сможете напрямую отреагировать на пропущенное событие, потому что реагировать не на что. - person Matthias J. Sax; 29.09.2015