Как адаптировать многопоточную скорость вычислений наблюдателя к холодному Observable‹List‹T››

У меня есть Источник как холодный Observable<List<T>> который эмитирует элементы чанками (списками), я хочу каждый отдельный элемент из чанка обработать в отдельном потоке, пока эмиттер (источник) ждет окончания обработки всех элементов из эмитированного чанка перейти к следующему и так далее.

Этот код (rxjava 2.0.6) делает то же самое, но только в одном потоке. Если я хочу разветвить вычисление наблюдателя во многих потоках с observeOn(Schedulers.io), исходный поток продолжит испускать все до завершения и не блокировать каждый фрагмент.

Observable<List<T>> lazy_source = Observable.create((ObservableEmitter<List<T>> e)
        -> {
    for (int i = 0; i < 1000; i++) {
        List<T> chunk = produceChunkOf(10);
        e.onNext(chunk);
    }

    e.onComplete();
});    
lazy_source
        .subscribeOn(Schedulers.io())
        .flatMap(chunk -> 
                Observable.fromIterable(chunk)
                    // .observeOn(Schedulers.io()) // Uncommenting this will flat all 1000 chunks at once.
                    .doOnNext(item -> consume(item))
                , 10) // Number of concurent Threads
        .subscribe();

Я буду признателен за любую помощь.


person Abu Lot    schedule 22.02.2017    source источник


Ответы (2)


как насчет такого:

 Observable.range(0, 1000)
            .concatMap(new Func1<Integer, Observable<?>>() {
                @Override
                public Observable<?> call(Integer integer) {
                    return produceChunkOf(10)
                            .flatMap(new Func1<Object, Observable<?>>() {
                                @Override
                                public Observable<?> call(Object item) {
                                    return consume(item)
                                            .observeOn(Schedulers.io());
                                }
                            }, 10)
                            .toList();
                }
            });

сначала вы создаете Observable, который передает входные данные в produceChunkOf, затем для каждого входного элемента вы concatMap выполняете свое требование последовательного выполнения для каждого фрагмента, для каждого входа вы создаете фрагмент и обрабатываете его параллельно с flatMap, затем собираете это после того, как все элементы обрабатываются с помощью toList()

person yosriz    schedule 22.02.2017

вот окончательный вариант (без накладных расходов):

Observable.range(0, 1_000_000)
        .subscribeOn(Schedulers.io())
        .concatMap(i -> produceChunkOf(100) // this returs an Observable of 100 items
                .flatMap(item -> Observable
                        .just(item)
                        .observeOn(Schedulers.io())
                        .doOnNext(element -> consume(element)), 
                        50)) // Number of concurent Threads
        .subscribe();
person Abu Lot    schedule 22.02.2017