Как ограничить количество открытых сокетов в Spring-WebFlux WebClient?

У меня есть сервис RESTful, и у меня возникла идея подготовить простой тест производительности с помощью Reactor и Spring WebClient. Бенчмарк просто создает N пользователей, а затем за каждый созданный пользовательский пост M голосует.

К сожалению, следующий код превышает максимальное количество открытых файлов на моей Linux-машине, равное 1024 (ulimit -n 1024).

    RestService restService = ...
    int N_ITERATIONS = 100;
    int M_VOTES = 100;

    Flux.range(0, N_ITERATIONS)
            .parallel()
            .runOn(Schedulers.parallel())
            .flatMap(iteration -> restService.postUserRegistration(User.builder().build()))
            .flatMap(user -> Flux.range(0, M_VOTES)
                    .flatMap(vote -> restService.postUserVote(Vote.builder().build()))
                    .collectList()
                    .map(votes -> Tuple.of(user, votes))
            ).doOnNext(userVotes -> log.info("User: {} voted: {}", userVotes._1(), userVotes._2()))
            .sequential()
            .toIterable();

RestService реализован с помощью стандартного WebClient от Spring Webflux.

Есть ли способ ограничить количество создаваемых сокетов на основе системного лимита?

Трассировки стека:

Caused by: io.netty.channel.unix.Errors$NativeIoException: newSocketStream(..) failed: Too many open files
    at io.netty.channel.unix.Errors.newIOException(Errors.java:122) ~[netty-transport-native-unix-common-4.1.27.Final.jar:4.1.27.Final]
    ... 98 common frames omitted

person Michał Mielec    schedule 02.09.2018    source источник


Ответы (1)


Я не думаю, что есть. Но вы можете предпринять шаги, чтобы предотвратить это.

Во-первых, почему у вас такой низкий лимит файлового дескриптора? Linux открывает файловый дескриптор для каждого открытого сокета, поэтому 1024 очень мало, если вы собираетесь иметь много открытых сокетов одновременно. Я бы подумал о значительном увеличении этого лимита.

Во-вторых, вы оставляете конфигурацию параллелизма на усмотрение планировщика. Вы должны знать, что существует вариант оператора flatMap, который позволяет вам контролировать, на сколько Publisher можно подписаться и объединить параллельно:

Flux<V> flatMap(
            Function<? super T,? extends Publisher<? extends V>> mapper,
            int concurrency)

Используя параметр concurrency, вы сможете определить, сколько последовательностей в полете вы хотите разрешить.

person ESala    schedule 02.09.2018
comment
Хорошо, спасибо, я не знал об этой версии flatMap, но, чтобы быть строгим, должно быть .flatMap(mapper, false, concurrency);, по крайней мере, это то, что у меня есть в Reactor 3 - person Michał Mielec; 02.09.2018
comment
Без проблем! Вы можете найти версию только с сопоставителем и параллелизмом здесь: projectreactor.io/docs/core/release/api/reactor/core/publisher/ - person ESala; 02.09.2018