Параллельная обработка проектного потока реактора

Я новичок в проектировании реакторов или реактивного программирования в целом, поэтому я, вероятно, делаю что-то не так. Я изо всех сил пытаюсь создать поток, который бы делал следующее:

Учитывая класс Entity:

Entity {
    private Map<String, String> items;
    public Map<String, String> getItems() {
        return items;
    }
}
  1. читать сущность из БД (ListenableFuture<Entity> readEntity())
  2. выполнить некоторую параллельную асинхронную обработку для каждого элемента (boolean processItem(Map.Entry<String, String> item))
  3. когда все закончили вызов doneProcessing (void doneProcessing(boolean b))

В настоящее время мой код:

handler = this;
Mono
    .fromFuture(readEntity())
    .doOnError(t -> {
        notifyError(“some err-msg” , t);
        return;
    })
    .doOnSuccess(e -> log.info("Got the Entity: " + e))
    .flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
    .all(handler::processItem)
    .consume(handler::doneProcessing);

Это работает, но вызовы handler::processItem не выполняются одновременно для всех элементов. Я пробовал использовать dispatchOn и publishOn как с io, так и с async SchedulerGroup и с различными параметрами, но все равно вызовы выполняются последовательно в одном потоке. Что я делаю неправильно?

Кроме того, я уверен, что в целом вышесказанное можно улучшить, поэтому любое предложение будет оценено по достоинству.

Спасибо


person user5396668    schedule 21.03.2016    source источник


Ответы (1)


Вам понадобится еще одна flatMap, которая разветвляет и объединяет вычисления для каждого отдельного элемента карты:

Mono.fromFuture(readEntity())
.flatMap(v -> Flux.fromIterable(v.getItems().entrySet()))
.flatMap(v -> Flux.just(v)
                .publishOn(SchedulerGroup.io())
                .doOnNext(handler::processItem))
.consume(handler::doneProcessing);
person akarnokd    schedule 30.03.2016
comment
Спасибо @akarnokd за ваше предложение, которое звучит многообещающе, хотя из-за нехватки времени я перешел на CompletableFutures, который кажется более простым и имеет разумную документацию. - person user5396668; 31.03.2016