Обработка исключений в потоках Spring XD

Как вы можете создать отказоустойчивый поток Spring XD, который будет продолжать работать правильно после того, как исключение будет вызвано для одного конкретного сообщения (т. е. регистрирует ошибку, но продолжает потреблять следующие сообщения в потоке), без необходимости добавлять try catch (Throwable) в каждый шаг потока?

Есть ли простой способ сделать это с помощью модели Reactor или RxJava?

Пример потока с использованием Reactor:

@Override
public Publisher<Tuple> process(Stream<GenericMessage> inputStream) {
  return inputStream
      .flatMap(SomeClass::someFlatMap)
      .filter(SomeClass::someFilter)
      .when(Throwable.class, t -> log.error("error", t));
}

person andresp    schedule 09.11.2015    source источник


Ответы (1)


RxJava может использоваться процессорным модулем. При создании необходимо создать подписку, а для обработки ошибок подписчику необходимо добавить обработчик onError:

       subject = new SerializedSubject(PublishSubject.create());
        Observable<?> outputStream = processor.process(subject);
        subscription = outputStream.subscribe(new Action1<Object>() {
            @Override
            public void call(Object outputObject) {
                if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) {
                    getOutputChannel().send((Message) outputObject);
                } else {
                    getOutputChannel().send(MessageBuilder.withPayload(outputObject).build());
                }
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                logger.error(throwable.getMessage(), throwable);
            }
        }, new Action0() {
            @Override
            public void call() {
                logger.error("Subscription close for [" + subscription + "]");
            }
        });

Посмотрите еще примеры здесь: https://github.com/spring-projects/spring-xd/tree/master/spring-xd-rxjava/src

person John Scattergood    schedule 23.12.2015