Java с Project Reactor: почему блок Mono () не видит ошибку?

Учитывая следующий код, result.block () равен «xx», isError () == false, но обработчик ошибок стрелы запущен, отображается трассировка стека, обещание не выполнено. Я ожидал, что результат будет «ко».

Что я делаю неправильно ? block () Javadoc говорит

вернет null, если onComplete, T, если onNext

public class RApp {

static final Logger LOG = LoggerFactory.getLogger(RApp.class);

public static void main(String[] args) {

    MonoProcessor<String> texecute = MonoProcessor.create();
    Mono<String> result = texecute.delaySubscription(Duration.ofSeconds(2))
            .onErrorReturn("ko")
            .doOnNext(s -> parse(s)
            .doOnSuccess(p -> LOG.info("promise completed {}", p))
            .doOnTerminate((z, e) -> LOG.info("term value: {} , {}", z, e))
            .doOnError(t -> {
                LOG.error("boom", t);
            })
            .subscribe());

    texecute.onNext("xx");

    LOG.info("...............;");
    String block = result.block();
    LOG.info("r={}", block);
    boolean error = texecute.isError();
    LOG.info(error ? "error" : "no error");
    texecute.dispose();

}

public static Mono<String> parse(String s) {
    System.out.println("parse s = " + s);
    if (s.equals("xx")) {
        return Mono.error(new Exception("no xx"));
    }
    return Mono.just(s);
}
}

person Laurent Perez    schedule 04.08.2017    source источник


Ответы (1)


Отвечая себе на этот вопрос: do * - это методы побочных эффектов, не изменяющие последовательность per https://projectreactor.io/docs/core/release/reference/#error.handling, а порядок onErrorReturn имеет значение.

Правильное рабочее решение ниже, с бонусами от response.core.Exceptions.propagate для обертывания проверенных исключений и счетчика отказов java 8:

    LongAdder failureStat = new LongAdder();

    MonoProcessor<String> texecute = MonoProcessor.create();
    Mono<String> result = texecute
            .delaySubscription(Duration.ofSeconds(2))
            .map(e -> parse2(e)).doOnError(e -> {
                failureStat.increment();
            }).doOnSuccess(s -> {
                LOG.info("success {}", s);
            })
            .onErrorReturn("ko")
            .subscribe();

    texecute.onNext("xx");

    LOG.info("...............;");
    String block = result.block();
    LOG.info("r={}", block);
    System.out.println("failureStat = " + failureStat);
    texecute.dispose();

public static String parse2(String s) {
    System.out.println("parse s = " + s);
    if (s.equals("xx")) {
        try {
            throw new Exception("no xx");
        } catch (Exception e) {
            throw Exceptions.propagate(e);
        }
    }
    return s;
}
person Laurent Perez    schedule 04.08.2017