Я новичок в проектировании реакторов или реактивного программирования в целом, поэтому я, вероятно, делаю что-то не так. Я изо всех сил пытаюсь создать поток, который бы делал следующее:
Учитывая класс Entity:
Entity {
private Map<String, String> items;
public Map<String, String> getItems() {
return items;
}
}
- читать сущность из БД (
ListenableFuture<Entity> readEntity()
) - выполнить некоторую параллельную асинхронную обработку для каждого элемента (
boolean processItem(Map.Entry<String, String> item)
) - когда все закончили вызов doneProcessing (
void doneProcessing(boolean b)
)
В настоящее время мой код:
handler = this;
Mono
.fromFuture(readEntity())
.doOnError(t -> {
notifyError(“some err-msg” , t);
return;
})
.doOnSuccess(e -> log.info("Got the Entity: " + e))
.flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
.all(handler::processItem)
.consume(handler::doneProcessing);
Это работает, но вызовы handler::processItem
не выполняются одновременно для всех элементов. Я пробовал использовать dispatchOn
и publishOn
как с io
, так и с async
SchedulerGroup
и с различными параметрами, но все равно вызовы выполняются последовательно в одном потоке. Что я делаю неправильно?
Кроме того, я уверен, что в целом вышесказанное можно улучшить, поэтому любое предложение будет оценено по достоинству.
Спасибо