Как дождаться завершения всех HTTP-запросов с помощью Spring WebClient?

Я хочу выполнить http-запрос для каждого элемента очереди. Эти запросы должны вызываться параллельно.
Также мне нужно дождаться завершения всех запросов.

Я разработал следующий код:

 List<Mono<MyResponseDTO>> monoList = queue.stream()
                .map(jobStatusBunch -> webClient
                        .post()
                        .uri("localhost:8080/api/some/url")
                        .bodyValue(convertToRequestDto(someBean))
                        .retrieve()
                        .toEntity(String.class)
                        .filter(HttpEntity::hasBody)
                        .map(stringResponseEntity -> {
                            try {
                                return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
                            } catch (JsonProcessingException e) {
                                log.error("Can't parse", e);
                                return null;
                            }
                        })
                        .doOnNext(myResponseDTO -> {
                            log.info("doOnNext is invoked");
                        })
                ).collect(Collectors.toList());
          //await when all MONOs are completed

log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
        .flatMap(Function.identity())
        .then();
log.info("Finished waiting for {}", monoList);

и я вижу следующий журнал, когда очередь имеет один элемент:

2019-11-19 19:17:17.733  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [   scheduling-1] o.s.w.r.f.client.ExchangeFunctions       : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230  INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService     : doOnNext is invoked

Таким образом, этот код не позволяет ждать завершения запроса.

Как я мог этого добиться?

P.S.

похоже, Flux.merge(monoList).blockLast() это то, что мне нужно. Будет ли работать правильно?


person gstackoverflow    schedule 19.11.2019    source источник
comment
Почему нужно ждать окончания запроса? Вы также должны впоследствии выполнить дополнительную работу реактивным способом.   -  person Joker    schedule 20.11.2019
comment
@Joker, это выходит за рамки - мне это действительно нужно.   -  person gstackoverflow    schedule 20.11.2019
comment
blockLast - это то, что вы тогда ищете для Flux. Но нет необходимости собирать Mono ‹MyResponseDTO› в список. Просто позвоните block из полученного вами Mono.   -  person Joker    schedule 21.11.2019
comment
@Joker, если я вызываю блок для каждого моно, это вызовет последовательное выполнение запросов. Я хочу делать запросы параллельно   -  person gstackoverflow    schedule 21.11.2019


Ответы (2)


Вы можете попробовать следующее:

Flux<MyResponseDTO> responses = queue.stream()
     .flatMap(jobStatusBunch -> webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(convertToRequestDto(someBean))
                    .retrieve()
                    .toEntity(MyResponseDTO.class));

 Mono<Void> workDone = response.then();

Это просто и должно работать. По умолчанию (если я не ошибаюсь) подписчик будет запрашивать 256 элементов, что означает, что вы получите максимум 256 HTTP-запросов, обрабатываемых параллельно. Это может зависеть от пула соединений, настроенного на HTTP-клиенте; по умолчанию в Reactor Netty максимальное количество TCP-каналов больше указанного.

Различные операторы Reactor, включая flatMap, предлагают варианты с параметром метода concurrency для управления максимальным параллелизмом.

Ваше решение с Flux.merge со списком Mono было бы эквивалентным. С другой стороны, использование Flux.concat не будет тем, что вы ищете, поскольку оно будет подписываться на Mono по мере запроса элементов, поэтому вы можете не получить желаемый максимальный параллелизм.

person Brian Clozel    schedule 04.12.2019
comment
Должен ли я вызывать что-нибудь на workDone для ожидания завершения? - person gstackoverflow; 13.12.2019

Простой случай

Используйте это для параллельного выполнения запросов и ожидания их завершения:

List<Mono<MyResponseDTO>> monoList = queue
        .stream()
        .map(requestDTO ->
                webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(requestDTO)
                    .retrieve()
                    .bodyToMono(MyResponseDTO.class))
        .collect(Collectors.toList());

// This will execute all requests in parallel and wait until they complete,
// or throw an exception if any request fails.
List<MyResponseDTO> responses = Flux.merge(monoList).collectList().block();

Проверка

Вы можете установить логирование reactor.netty.http.client на DEBUG, чтобы не было дополнительных запросов. Это может произойти, например, если вы случайно используете и mono#subscribe, и mono#block.

Более сложный случай с CompletableFuture

Если вы хотите разделить обработку ответов и ожидание завершения запросов, можно использовать CompletableFutures:

List<Mono<MyResponseDTO>> webClientMonos = getMonos();

// Start executing requests in parallel.
List<CompletableFuture<MyResponseDTO>> futures = webClientMonos.stream()
        .map(mono -> mono.toFuture())
        .collect(toList());
for (CompletableFuture<MyResponseDTO> future : futures) {
    future.thenAccept(responseDTO -> {
        // Do something with a response when it arrives at some point.
        // ...
    });
}

// ...

// Block until all requests have completed.
for (CompletableFuture<MyResponseDTO> future : futures) {
    try {
        // Maybe WebClient has been configured with timeouts,
        // but it doesn't hurt to have a timeout here, too.
        future.get(60, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        throw new RuntimeException(ex);
    } catch (ExecutionException | TimeoutException ex) {
        // ExecutionException is thrown if HTTP request fails.
        throw new RuntimeException(ex);
    }
}
person Milanka    schedule 26.03.2021