Spring-Integration обработка исключений Webflux

Если исключение возникает в потоке webflux весенней интеграции, само исключение (с stacktrace) отправляется обратно вызывающей стороне в виде полезной нагрузки через MessagePublishingErrorHandler, который использует канал ошибки из заголовка errorChannel, а не канал ошибки по умолчанию.

Как настроить обработчик ошибок, подобный WebExceptionHandler? Я хочу создать код состояния Http и, возможно, объект DefaultErrorAttributes в качестве ответа.

Простое определение потока, который начинается с errorChannel, не работает, сообщение об ошибке там не появится. Я попытался определить свой собственный fluxErrorChannel, но оказалось, что он также не используется в качестве канала ошибок, ошибки не попадают в мой errorFlow:

@Bean
public IntegrationFlow fooRestFlow() {
    return IntegrationFlows.from(
            WebFlux.inboundGateway("/foo")
                    .requestMapping(r -> r.methods(HttpMethod.POST))
                    .requestPayloadType(Map.class)
                    .errorChannel(fluxErrorChannel()))
            .channel(bazFlow().getInputChannel())
            .get();
}

@Bean
public MessageChannel fluxErrorChannel() {
    return MessageChannels.flux().get();
}

@Bean
public IntegrationFlow errorFlow() {
    return IntegrationFlows.from(fluxErrorChannel())
            .transform(source -> source)
            .enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY))
            .get();
}

@Bean
public IntegrationFlow bazFlow() {
    return f -> f.split(Map.class, map -> map.get("items"))
            .channel(MessageChannels.flux())
            .<String>handle((p, h) -> throw new RuntimeException())
            .aggregate();
}

ОБНОВЛЕНИЕ

В MessagingGatewaySupport.doSendAndReceiveMessageReactive мой канал ошибки, определенный в WebFlux.inboundGateway, никогда не используется для установки канала ошибки, скорее канал ошибки всегда replyChannel, который создается здесь

FutureReplyChannel replyChannel = new FutureReplyChannel();

Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
    .setReplyChannel(replyChannel)
    .setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
    .setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
    .setErrorChannel(replyChannel)
    .build();

Канал ошибки в конечном итоге сбрасывается на originalErrorChannelHandler в Mono.fromFuture, но в моем случае этот канал ошибки «нулевой». Кроме того, лямбда onErrorResume никогда не вызывается:

return Mono.fromFuture(replyChannel.messageFuture)
    .doOnSubscribe(s -> {
        if (!error && this.countsEnabled) {
            this.messageCount.incrementAndGet();
        }
    })
    .<Message<?>>map(replyMessage ->
        MessageBuilder.fromMessage(replyMessage)
            .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
            .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
            .build())
    .onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));

Как это должно работать?


person dschulten    schedule 06.10.2018    source источник
comment
соответствующая часть справки: docs.spring.io/spring-integration/reference/html/   -  person dschulten    schedule 07.10.2018
comment
В разделе обработки ошибок справочника docs.spring.io/spring-integration/docs/current/reference/html/ Ошибки WebFlux не упоминаются - отличаются ли они от sync / async?   -  person dschulten    schedule 07.10.2018


Ответы (1)


Это ошибка; ErrorMessage, созданный для исключения обработчиком ошибок, отправляется в заголовок errorChannel (который должен быть replyChannel, чтобы шлюз получил результат). Затем шлюз должен вызвать поток ошибок (если он есть) и вернуть результат.

https://jira.spring.io/browse/INT-4541

person Gary Russell    schedule 08.10.2018
comment
отлично: o) Спасибо, что изучили это. - person dschulten; 10.10.2018
comment
5.0.9 должна быть выпущена в понедельник. - person Gary Russell; 10.10.2018