У меня есть Источник как холодный Observable<List<T>>
который эмитирует элементы чанками (списками), я хочу каждый отдельный элемент из чанка обработать в отдельном потоке, пока эмиттер (источник) ждет окончания обработки всех элементов из эмитированного чанка перейти к следующему и так далее.
Этот код (rxjava 2.0.6) делает то же самое, но только в одном потоке. Если я хочу разветвить вычисление наблюдателя во многих потоках с observeOn(Schedulers.io)
, исходный поток продолжит испускать все до завершения и не блокировать каждый фрагмент.
Observable<List<T>> lazy_source = Observable.create((ObservableEmitter<List<T>> e)
-> {
for (int i = 0; i < 1000; i++) {
List<T> chunk = produceChunkOf(10);
e.onNext(chunk);
}
e.onComplete();
});
lazy_source
.subscribeOn(Schedulers.io())
.flatMap(chunk ->
Observable.fromIterable(chunk)
// .observeOn(Schedulers.io()) // Uncommenting this will flat all 1000 chunks at once.
.doOnNext(item -> consume(item))
, 10) // Number of concurent Threads
.subscribe();
Я буду признателен за любую помощь.