TcpClient возвращает только первый результат в контроллере Spring WebFlux

У меня есть HTTP-служба, открывающая конечную точку GET, которая подключается к простому эхо-серверу через TCP. Служба HTTP работает на Netty.

@RestController
public class OurTcpClient {

    private Connection connection;

    @GetMapping("echo1")
    public Mono<String> echo(@RequestParam("value") final String value) {
        this.connection.outbound()
            .sendString(Mono.just(String.format("%04d", value.length()) + value)) // prepend length
            .then()
            .subscribe();
        return this.connection.inbound()
            .receive()
            .asString()
            .next();
    }

    @PostConstruct
    public void init() {
        this.connection = TcpClient.create()
            .host("localhost")
            .port(10002)
            .wiretap(true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .connectNow();
    }
}

Я ожидаю, что я могу запросить службу, например, по адресу http://localhost:8081/echo1?value=hi, сколько угодно раз, и получайте "привет" в каждом ответе. Это работает для первого запроса. Второй запрос висит на неопределенный срок. Если я затем отменю второй запрос и попытаюсь выполнить другой, я получу следующую ошибку:

{
    "timestamp": "2020-04-13T18:56:40.221+0000",
    "path": "/echo1",
    "status": 500,
    "error": "Internal Server Error",
    "message": "Only one connection receive subscriber allowed."
}

Любая помощь будет принята с благодарностью.


person Francisco Z.    schedule 13.04.2020    source источник


Ответы (1)


В примере вы используете next()

return this.connection.inbound()
            .receive()
            .asString()
            .next();

Согласно Flux#next javadoc испускать только первый элемент, выпущенный этим Flux, в новый Mono., то подписка будет отменена.

В контексте Reactor Netty, когда вы используете операторы next, timeout, take и т. Д., Отменяющие подписку, это означает, что соединение будет закрыто.

person Violeta Georgieva    schedule 14.04.2020
comment
С вашей помощью мне удалось решить проблему, добавив this.hotSource = this.connection.inbound (). Receive (). AsString (). Cache (0); в метод init () и изменив метод echo, чтобы он возвращал this.hotSource.as (Mono :: from) ;. Но мне интересно, неправильно ли это использует метод .cache (). Спасибо за вашу помощь. - person Francisco Z.; 14.04.2020