Как вы можете создать отказоустойчивый поток Spring XD, который будет продолжать работать правильно после того, как исключение будет вызвано для одного конкретного сообщения (т. е. регистрирует ошибку, но продолжает потреблять следующие сообщения в потоке), без необходимости добавлять try catch (Throwable) в каждый шаг потока?
Есть ли простой способ сделать это с помощью модели Reactor или RxJava?
Пример потока с использованием Reactor:
@Override
public Publisher<Tuple> process(Stream<GenericMessage> inputStream) {
return inputStream
.flatMap(SomeClass::someFlatMap)
.filter(SomeClass::someFilter)
.when(Throwable.class, t -> log.error("error", t));
}