Как объединить одно сообщение в несколько групп с помощью агрегата верблюда?

Я пытаюсь создать совокупное представление последовательных рыночных данных, что означает, что нам нужно вычислять значение суммы каждые 2 сообщения. сказать, что данные поступают как:

(V0,T0),(V1,T1),(V2,T2),(V3,T3)....

V означает значение T означает отметку времени, когда мы получаем данные.

Нам нужно сгенерировать сумму для каждых 2 точек, скажем:

(R1=Sum(V0,V1),T1),(R2=Sum(V1,V2),T2),(R3=Sum(V2,V3),T3),....

Любое предложение, как мы можем сделать это, используя aggregator2, или нам нужно написать для этого процессор?


person user2956246    schedule 05.11.2013    source источник


Ответы (2)


Вы правы, лучше всего использовать компонент aggregator2. Я бы попробовал что-то вроде этого:

from("somewhere").split(body().tokenize("),")).streaming()
    .aggregate(new ValueAggregationStrategy()).completionTimeout(1500)
    .to("whatYouWant");

class ValueAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }

        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);

        oldExchange.getIn().setBody(extractValue(oldBody) + extractValue(newBody));
        return oldExchange;
    }

    public int extractValue(String body) {
        // Do the work "(V0,T0" -> "V0"
    }
}

Примечание: было бы проще анализировать, если бы у вас был такой формат: V0,T0;V1,T1...

Для получения дополнительной информации: здесь статья Клауса Ибсена о разборе больших файлов с помощью Camel

person Pith    schedule 05.11.2013
comment
Я считаю, что после метода aggregate() вам нужно сообщить верблюду, когда агрегированное сообщение будет готово, используя такие методы, как completionSize() или completionTimeout(). - person hveiga; 06.11.2013
comment
спасибо за ваш ответ, может быть, я не ясно выразился, на самом деле входные данные представляют собой файл CSV, я уже проанализировал его в потоке в объекты POJO, несущие данные в виде поля, и нам нужно сгенерировать совокупный результат (сумма, среднее) для каждых последовательных n записей (полные по размеру могут лучше подходить в этом случае: P), что приводит к тому, что (VX, TX) должны быть объединены в несколько последовательных групп (X, X + 1,..., x + n-1 ), в соответствии с документом, похоже, нам нужно предоставить идентификатор корреляции, который будет помещать (VX, TX) только в одну группу (скажем, использовать X как corID). любые другие предложения? - person user2956246; 06.11.2013

После прочтения исходного кода агрегатора выясняется, что верблюд собирает только одно сообщение в одну группу, для этой цели нам нужно построить «агрегатор». вот код:

public abstract class GroupingGenerator<I> implements Processor {
private final EvictingQueue<I> queue;
private final int size;

public int getSize() {
    return size;
}

public GroupingGenerator(int size) {
    super();
    this.size = size;
    this.queue = EvictingQueue.create(size);
}

@SuppressWarnings("unchecked")
@Override
public void process(Exchange exchange) throws Exception {
    queue.offer((I) exchange.getIn().getBody());
    if (queue.size() != size) {
        exchange.setProperty(Exchange.ROUTE_STOP, true);
        return;
    } else {
        processGroup(queue, exchange);
    }
}

protected abstract void processGroup(Collection<I> items, Exchange exchange);

}
person user2956246    schedule 14.11.2013