Ограничение скорости запросов с помощью Reactor

Я использую проектный реактор для загрузки данных из веб-службы с помощью отдыха. Это делается параллельно с несколькими потоками. Я начинаю сталкиваться с ограничениями скорости в веб-службе, поэтому я хотел бы отправлять не более 10 запросов в секунду, чтобы избежать этих ошибок. Как бы я сделал это, используя реактор?

Используя zipWith(Mono.delayMillis(100))? Или есть лучший способ?

Спасибо


person Mavo    schedule 05.05.2017    source источник
comment
Текущее решение: Flux.range(1, 10) .zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS))) .map(Tuple2::getT1) .toIterable() .forEach(i -> logger. информация (Получено: {}, я));   -  person Mavo    schedule 05.05.2017


Ответы (2)


Вы можете использовать delayElements вместо всего zipwith.

person Simon Baslé    schedule 09.05.2017
comment
можно ли получить Response из Flux‹Response›? - person Don Code; 05.10.2020

Код ниже будет выполнять GET на https://www.google.com/ со скоростью 10 запросов в секунду. Вам придется внести дополнительные изменения, чтобы поддержать ситуацию, когда ваш сервер не может обработать в 1с все ваши 10 запросов; вы можете просто пропустить отправку запросов, когда запросы, заданные в предыдущую секунду, все еще обрабатываются вашим сервером.

@Test
void parallelHttpRequests() {
    // this is just for limiting the test running period otherwise you don't need it
    int COUNT = 2;

    // use whatever (blocking) http client you desire;
    // when using e.g. WebClient (Spring, non blocking client)
    // the example will slightly change for no longer use
    // subscribeOn(Schedulers.elastic())
    RestTemplate client = new RestTemplate();
    
    var exit = new AtomicBoolean(false);
    var lock = new ReentrantLock();
    var condition = lock.newCondition();

    MessageFormat message = new MessageFormat("#batch: {0}, #req: {1}, resultLength: {2}");
    Flux.interval(Duration.ofSeconds(1L))
            .take(COUNT) // this is just for limiting the test running period otherwise you don't need it
            .doOnNext(batch -> debug("#batch", batch)) // just for debugging
            .flatMap(batch -> Flux.range(1, 10) // 10 requests per 1 second
                            .flatMap(i -> Mono.fromSupplier(() ->
                                    client.getForEntity("https://www.google.com/", String.class).getBody()) // your request goes here (1 of 10)
                                    .map(s -> message.format(new Object[]{batch, i, s.length()})) // here the request's result will be the output of message.format(...)
                                    .doOnSubscribe(s -> debug("doOnSubscribe: #batch = " + batch + ", i = " + i)) // just for debugging
                                    .subscribeOn(Schedulers.elastic()) // one I/O thread per request
                            )
            )
            .subscribe(
                    s -> debug("received", s) // do something with the above request's result
                    e -> {
                        debug("error", e.getMessage());
                        signalAll(exit, condition, lock);
                    },
                    () -> {
                        debug("done");
                        signalAll(exit, condition, lock);
                    }
            );

    await(exit, condition, lock);
}

// most probably you won't need the "await" and "signalAll" methods below but
// I created them anyway just to be easier for one to run this in a test class

private void await(AtomicBoolean exit, Condition condition, Lock lock) {
    lock.lock();
    while (!exit.get()) {
        try {
            condition.await();
        } catch (InterruptedException e) {
            // maybe spurious wakeup
            e.printStackTrace();
        }
    }
    lock.unlock();
    debug("exit");
}

private void signalAll(AtomicBoolean exit, Condition condition, Lock lock) {
    exit.set(true);
    try {
        lock.lock();
        condition.signalAll();
    } finally {
        lock.unlock();
    }
}
person adrhc    schedule 14.07.2019