Как ограничить количество событий, которые в данный момент обрабатываются в процессе flatMap?

Учитывая следующий фрагмент кода

public static void main(String[] args) {

    long start = System.currentTimeMillis();

    Flux.<Long>generate(s -> s.next(System.currentTimeMillis() - start))
            .flatMap(DemoApp::delayedAction)
            .doOnNext(l -> System.out.println(l + " -- " + (System.currentTimeMillis() - start)))
            .blockLast(Duration.ofSeconds(3));
}

private static Publisher<? extends Long> delayedAction(Long l) {
    return Mono.just(l).delayElement(Duration.ofSeconds(1));
}

Из вывода видно, что большое количество событий "обрабатывается" в delayedAction одновременно. В этом примере 256 событий генерируются в течение нескольких миллисекунд, а затем ждут около секунды, пока они не будут отправлены снова.

Я хочу ограничить это число, например. 10, как я могу это сделать?

Решение не должно зависеть от того, что происходит внутри delayedAction

Фон

Что на самом деле происходит в отложенном действии: я выполняю HTTP-запросы, и запуск неограниченного (или очень большого) количества запросов не кажется хорошей идеей.


person Jens Schauder    schedule 04.11.2017    source источник
comment
Можете ли вы опубликовать свое решение?   -  person Ashok Koyi    schedule 30.07.2018
comment
@AshokKoyi stackoverflow.com/a/47108564/66686   -  person Jens Schauder    schedule 30.07.2018
comment
Вы упомянули об этом в конце своего вопроса, я уже нашел решение. Но думаю, что это может помочь другим. Опубликует ответ. Такое ощущение, что у вас другой ответ, чем принятый. Не могли бы вы исправить это   -  person Ashok Koyi    schedule 30.07.2018
comment
Принятый ответ - мой ответ. Я включил ту часть, которую вы процитировали, в вопрос, чтобы другие не начали вводить ответ, а затем расстроились, когда после его сохранения заметили, что я уже написал свой собственный. Я удалил цитируемую часть, поскольку теперь она, кажется, вызывает путаницу, и она выполнила свою задачу.   -  person Jens Schauder    schedule 30.07.2018


Ответы (1)


Для этого уже существует метод: Flux.flatMap(Function> mapper, int concurrency)

Из его документации:

Аргумент параллелизма позволяет контролировать количество издателей, на которые можно подписаться и объединить параллельно.

person Jens Schauder    schedule 04.11.2017
comment
Я написал об этом немного подробнее: blog.schauderhaft.de/2017/11/08/ - person Jens Schauder; 10.11.2017
comment
я знаю, что это отвечает на вопрос, как он был задан, но название вопроса привело меня сюда по другой причине. Параллелизм — это не то же самое, что обратное давление. Параллелизм исправлен - есть ли способ решить эту проблему более динамичным (обратным давлением) способом? - person jamey graham; 30.11.2017
comment
Параллелизм превращается в обратное давление, поскольку процессор плоской карты будет запрашивать больше событий только тогда, когда у него есть доступные слоты параллелизма. Если это не отвечает на ваш вопрос, вам, вероятно, следует опубликовать правильный вопрос с более подробным описанием того, чего вы хотите достичь. - person Jens Schauder; 30.11.2017