Правила агрегирования Apache Samza для отсутствующих ожидаемых событий в скользящем периоде времени

Мой вариант использования — идентифицировать сущности, от которых ожидаемые события не были получены по прошествии X времени в режиме реального времени.

Например: если мы получили событие PaymentInitiated в момент времени T, но не получили ни PaymentFailed, ни PaymentAborted, ни PaymentSucedded по T+X, то активируем триггер с сообщением PaymentStuck вместе с подробностями события PaymentIntitiated.

Как я могу смоделировать такие варианты использования в Apache Samza, поскольку он использует период времени X для каждого события, а не фиксированный интервал времени.

Спасибо, Хариш


person Harish    schedule 28.09.2015    source источник


Ответы (1)


Я не знаю о какой-либо встроенной поддержке этого в Samza, но я могу представить обходной путь, использующий WindowableTask.

public class PaymentEvent implements Comparable<PaymentEvent> {
    // if current time > timestamp, payment is stuck
    public long timestamp; 
    // we want a corresponding PaymentFailed... event with the same id
    public long interactionId; 
    // PaymentRequest, PaymentAborted, PaymentSucceeded...
    public enum type;
    ...

    @Override
    public int compareTo(PaymentEvent o){
        return timestamp - o.timestamp;
    }
}

Теперь в вашем методе процесса у вас будет что-то вроде:

PriorityQueue<PaymentEvent> pqueue;
Map<Long, PaymentEvent> responses;

public void process(...) {
    PaymentEvent e = new PaymentEvent(envelope.getMessage());
    if (e.enum == PAYMENT_REQUEST) {
        pqueue.add(e);
    } else {
        responses.put(e.interactionId, e);
    }
}

И, наконец, во время вашего окна вы выталкиваете из очереди приоритетов все с помощью timestamp > current time и проверяете, есть ли соответствующее событие в Карте.

public void window(...) {
    while(pqueue.peek().timestamp <= currentTime) {
        if (!map.containsKey(pqueue.poll().interactionId) {
            // send the trigger via the collector
        } 
    }
}

Затем, наконец, вы должны установить время окна в своей конфигурации на то, сколько вы хотите опросить. Конфиг task.window.ms.

person Almog    schedule 21.10.2015