Почему параллельное выполнение не происходит для нескольких наблюдаемых RXJava?

Я новичок в RxJava и пытался выполнить пример параллельного выполнения для нескольких Observables по ссылке: Параллельная выборка RxJava Observable

Хотя пример, приведенный в приведенной выше ссылке, выполняет наблюдаемые параллельно, но когда я добавил Thread.sleep(TIME_IN_MILLISECONDS) в метод forEach, система начала выполнять по одному наблюдаемому за раз. Пожалуйста, помогите мне понять, почему Thread.sleep останавливает параллельное выполнение Observables.

Ниже приведен модифицированный пример, который вызывает синхронное выполнение нескольких наблюдаемых:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
        .forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){}; 
        System.out.println(x + " " + Thread.currentThread().getId());});
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(i);
            s.onCompleted();
        });
    }
}

В приведенном выше примере мы используем метод subscribeOn наблюдаемого и предоставляем ThreadPool(Schedules.io) для выполнения, поэтому подписка для каждого наблюдаемого будет происходить в отдельном потоке.

Существует вероятность того, что Thread.sleep блокирует любой общий объект между потоками, но я до сих пор не понимаю этого. Пожалуйста помоги.


person Kshitij Jain    schedule 19.06.2017    source источник
comment
Откуда вы знаете, что источники не работали параллельно?   -  person akarnokd    schedule 19.06.2017
comment
Я узнаю это, распечатав текущий идентификатор потока.   -  person Kshitij Jain    schedule 19.06.2017


Ответы (1)


На самом деле, в вашем примере происходит параллельное выполнение, вы просто неправильно смотрите на это, есть разница между тем, где выполняется работа, и где выдается уведомление.

если вы поместите журнал с идентификатором потока в Observable.create, вы заметите, что каждый Observable выполняется в разных потоках одновременно. но уведомление происходит последовательно. это поведение, как и ожидалось, как часть контракта Observable, заключается в том, что наблюдаемые должны выдавать уведомления наблюдателям последовательно (а не параллельно).

person yosriz    schedule 19.06.2017