Использование Flux.buffer реактора для пакетной обработки работает только для одного элемента

Я пытаюсь использовать Flux.buffer() для пакетной загрузки из базы данных.

Вариант использования заключается в том, что загрузка записей из БД может быть «взрывной», и я хотел бы ввести небольшой буфер для группировки загрузок, где это возможно.

Мой концептуальный подход состоял в том, чтобы использовать некоторую форму процессора, публиковать его в приемнике, позволять этому буферу, а затем подписываться и фильтровать результат, который я хочу.

Я пробовал несколько разных подходов (разные типы процессоров, создание отфильтрованного Mono по-разному).

Ниже то, чего я достиг до сих пор - в основном спотыкаясь.

В настоящее время это возвращает один результат, но последующие вызовы отбрасываются (хотя я не уверен, где именно).

class BatchLoadingRepository {
    // I've tried all manner of different processors here.  I'm unsure if
    // TopicProcessor is the correct one to use.
    private val bufferPublisher = TopicProcessor.create<String>()
    private val resultsStream = bufferPublisher
            .bufferTimeout(50, Duration.ofMillis(50))
            // I'm unsure if concatMapIterable is the correct operator here, 
            // but it seems to work.
            // I'm really trying to turn the List<MyEntity> 
            // into a stream of MyEntity, published on the Flux<>
            .concatMapIterable { requestedIds ->
                // this is a Spring Data repository.  It returns List<MyEntity>
                repository.findAllById(requestedIds)
            }

    // Multiple callers will invoke this method, and then subscribe to receive
    // their entity back.
    fun findByIdAsync(id: String): Mono<MyEntity> {

        // Is there a potential race condition here, caused by a result
        // on the resultsStream, before I've subscribed?
        return Mono.create<MyEntity> { sink ->
            bufferPublisher.sink().next(id)
            resultsStream.filter { it.id == id }
                    .subscribe { next ->
                        sink.success(next)
                    }
        }
    }
}

person Marty Pitt    schedule 15.03.2019    source источник


Ответы (1)


Привет, я тестировал ваш код и думаю, что лучший способ — использовать общий EmitterProcessor. Я провел тест с emitterProcessor, и, похоже, он работает.

Flux<String> fluxi;
EmitterProcessor emitterProcessor;

@Override
public void run(String... args) throws Exception {
    emitterProcessor = EmitterProcessor.create();

    fluxi = emitterProcessor.share().bufferTimeout(500, Duration.ofMillis(500))
            .concatMapIterable(o -> o);

    Flux.range(0,1000)
            .flatMap(integer -> findByIdAsync(integer.toString()))
            .map(s -> {
                System.out.println(s);
                return s;
            }).subscribe();

}

private Mono<String> findByIdAsync(String id) {
    return Mono.create(monoSink -> {
        fluxi.filter(s -> s == id).subscribe(value -> monoSink.success(value));
        emitterProcessor.onNext(id);
    });
}
person Ricard Kollcaku    schedule 20.03.2019