Я пытаюсь использовать Flux.buffer()
для пакетной загрузки из базы данных.
Вариант использования заключается в том, что загрузка записей из БД может быть «взрывной», и я хотел бы ввести небольшой буфер для группировки загрузок, где это возможно.
Мой концептуальный подход состоял в том, чтобы использовать некоторую форму процессора, публиковать его в приемнике, позволять этому буферу, а затем подписываться и фильтровать результат, который я хочу.
Я пробовал несколько разных подходов (разные типы процессоров, создание отфильтрованного Mono по-разному).
Ниже то, чего я достиг до сих пор - в основном спотыкаясь.
В настоящее время это возвращает один результат, но последующие вызовы отбрасываются (хотя я не уверен, где именно).
class BatchLoadingRepository {
// I've tried all manner of different processors here. I'm unsure if
// TopicProcessor is the correct one to use.
private val bufferPublisher = TopicProcessor.create<String>()
private val resultsStream = bufferPublisher
.bufferTimeout(50, Duration.ofMillis(50))
// I'm unsure if concatMapIterable is the correct operator here,
// but it seems to work.
// I'm really trying to turn the List<MyEntity>
// into a stream of MyEntity, published on the Flux<>
.concatMapIterable { requestedIds ->
// this is a Spring Data repository. It returns List<MyEntity>
repository.findAllById(requestedIds)
}
// Multiple callers will invoke this method, and then subscribe to receive
// their entity back.
fun findByIdAsync(id: String): Mono<MyEntity> {
// Is there a potential race condition here, caused by a result
// on the resultsStream, before I've subscribed?
return Mono.create<MyEntity> { sink ->
bufferPublisher.sink().next(id)
resultsStream.filter { it.id == id }
.subscribe { next ->
sink.success(next)
}
}
}
}