Как использовать Spring WebClient для выполнения неблокирующих вызовов и отправки электронной почты после завершения всех вызовов?

Я использую Spring WebClient и реактор проекта, чтобы делать неблокирующие вызовы к списку URL-адресов. Мои требования:

  1. Асинхронно вызывать GET для списка URL-адресов
  2. Регистрируйте URL-адрес при вызове каждого URL-адреса
  3. Зарегистрируйте URL-адрес вызова, который приводит к исключению
  4. Зарегистрируйте URL-адрес успешного вызова
  5. Зарегистрируйте URL-адрес вызова, который приводит к статусу HTTP, отличному от 2xx.
  6. Отправьте электронное письмо, содержащее список URL-адресов, вызов которых привел к исключению или статусу HTTP, отличному от 2xx.

Вот моя попытка сделать это:

List<Mono<ClientResponse>> restCalls = new ArrayList<>();
List<String> failedUrls = new ArrayList<>();    
for (String serviceUrl : serviceUrls.getServiceUrls()) {
        
        restCalls.add(
                webClientBuilder
                    .build()
                    .get()
                    .uri(serviceUrl)
                    .exchange()
                    .doOnSubscribe(c -> log.info("calling service URL {}", serviceUrl))
                    .doOnSuccess(response -> log.info("{} success status {}", serviceUrl, response.statusCode().toString()))
                    .doOnError(response -> {log.info("{} error status {}", serviceUrl, response); failedUrls.add(serviceUrl);}));
    }
    
    Flux.fromIterable(restCalls)
        .map((data) -> data.subscribe())
        .onErrorContinue((throwable, e) -> {
             
            log.info("Exception for URL {}", ((WebClientResponseException) throwable).getRequest().getURI());
          
            failedUrls.add(serviceUrl);
        })
        .collectList()
        .subscribe((data) -> {
            log.info("all called");
            email.send("Failed URLs are {}", failedUrls);
    });

Проблема в том, что письмо отправляется до ответа на звонок. Как я могу дождаться завершения всех вызовов URL-адресов до вызова email.send?


person James    schedule 26.03.2021    source источник
comment
Следует как минимум заменить .map((data) -> data.subscribe()) на .flatMap(data -> data), потому что подписка запускает действие, но не позволяет наблюдать его результат (возвращаемое значение только для отмены запущенного задания). Также остерегайтесь onErrorContinue (дополнительная информация по этой проблеме). Я постараюсь добавить более развернутый ответ позже, но, возможно, этих подсказок вам будет достаточно для решения ваших проблем.   -  person amanin    schedule 31.03.2021


Ответы (2)


Как указано в комментарии, основная ошибка в вашем примере - это использование «подписки», которая запускает запросы, но в контексте, не зависящем от основного потока, поэтому вы не можете вернуть ошибки или результаты.

subscribe - это своего рода триггерная операция в конвейере, она не используется для связывания.

Вот полный пример (кроме электронной почты, замененной журналированием):

package fr.amanin.stackoverflow;

import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class WebfluxURLProcessing {

    private static final Logger LOGGER = Logger.getLogger("example");

    public static void main(String[] args) {

        final List<String> urls = Arrays.asList("https://www.google.com", "https://kotlinlang.org/kotlin/is/wonderful/", "https://stackoverflow.com", "http://doNotexists.blabla");

        final Flux<ExchangeDetails> events = Flux.fromIterable(urls)
                // unwrap request async operations
                .flatMap(url -> request(url))
                // Add a side-effect to log results
                .doOnNext(details -> log(details))
                // Keep only results that show an error
                .filter(details -> details.status < 0 || !HttpStatus.valueOf(details.status).is2xxSuccessful());

        sendEmail(events);
    }

    /**
     * Mock emails by collecting all events in a text and logging it.
     * @param report asynchronous flow of responses
     */
    private static void sendEmail(Flux<ExchangeDetails> report) {
        final String formattedReport = report
                .map(details -> String.format("Error on %s. status: %d. Reason: %s", details.url, details.status, details.error.getMessage()))
                // collecting (or reducing, folding, etc.) allows to gather all upstream results to use them as a single value downstream.
                .collect(Collectors.joining(System.lineSeparator(), "REPORT:"+System.lineSeparator(), ""))
                // In a real-world scenario, replace this with a subscribe or chaining to another reactive operation.
                .block();
        LOGGER.info(formattedReport);
    }

    private static void log(ExchangeDetails details) {
        if (details.status >= 0 && HttpStatus.valueOf(details.status).is2xxSuccessful()) {
            LOGGER.info("Success on: "+details.url);
        } else {
            LOGGER.log(Level.WARNING,
                    "Status {0} on {1}. Reason: {2}",
                    new Object[]{
                            details.status,
                            details.url,
                            details.error == null ? "None" : details.error.getMessage()
                    });
        }
    }

    private static Mono<ExchangeDetails> request(String url) {
        return WebClient.create(url).get()
                .retrieve()
                // workaround to counter fail-fast behavior: create a special error that will be converted back to a result
                .onStatus(status -> !status.is2xxSuccessful(), cr -> cr.createException().map(err -> new RequestException(cr.statusCode(), err)))
                .toBodilessEntity()
                .map(response -> new ExchangeDetails(url, response.getStatusCode().value(), null))
                // Convert back custom error to result
                .onErrorResume(RequestException.class, err -> Mono.just(new ExchangeDetails(url, err.status.value(), err.cause)))
                // Convert errors that shut connection before server response (cannot connect, etc.) to a result
                .onErrorResume(Exception.class, err -> Mono.just(new ExchangeDetails(url, -1, err)));
    }

    public static class ExchangeDetails {
        final String url;
        final int status;
        final Exception error;

        public ExchangeDetails(String url, int status, Exception error) {
            this.url = url;
            this.status = status;
            this.error = error;
        }
    }

    private static class RequestException extends RuntimeException {
        final HttpStatus status;
        final Exception cause;

        public RequestException(HttpStatus status, Exception cause) {
            this.status = status;
            this.cause = cause;
        }
    }
}

person amanin    schedule 31.03.2021

Я не тестировал это, но это должно работать

public void check() {
    
    List<Flux<String>> restCalls = new ArrayList<>();
    
    for (String serviceUrl : serviceUrls.getServiceUrls()) {
        restCalls.add(rest.getForEntity(serviceUrl, String.class));
    }
    
   Flux.fromIterable(restCalls)
            .map((data) -> data.blockFirst())
            .onErrorContinue((throwable, e) -> {
                ((WebClientResponseException) throwable).getRequest().getURI(); // get the failing URI
                // do whatever you need with the failed service

            })
            .collectList() // Collects all the results into a list
            .subscribe((data) -> {
                // from here do whatever is needed from the results
            });
}

Поэтому, если вы этого не сделали, вызов службы должен быть неблокирующим, поэтому вам следует преобразовать тип в Flux. Итак, внутри вашего restService ваш метод должен быть примерно таким

public Flux<String> getForEntity(String name) {
    return this.webClient.get().uri("url", name)
            .retrieve().bodyToFlux(String.class);
}

Я надеюсь это поможет

person user3353167    schedule 27.03.2021
comment
Спасибо. Но этот blockFirst метод недопустим. Когда я меняю его на block, он блокируется после каждого вызова URL. Я хочу, чтобы мои URL-вызовы не блокировали. Как только все URL-адреса вернут ответ, я хочу отправить электронное письмо. - person James; 30.03.2021