Я новичок в RxJava и пытался выполнить пример параллельного выполнения для нескольких Observables по ссылке: Параллельная выборка RxJava Observable
Хотя пример, приведенный в приведенной выше ссылке, выполняет наблюдаемые параллельно, но когда я добавил Thread.sleep(TIME_IN_MILLISECONDS) в метод forEach, система начала выполнять по одному наблюдаемому за раз. Пожалуйста, помогите мне понять, почему Thread.sleep останавливает параллельное выполнение Observables.
Ниже приведен модифицированный пример, который вызывает синхронное выполнение нескольких наблюдаемых:
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class ParallelExecution {
public static void main(String[] args) {
System.out.println("------------ mergingAsync");
mergingAsync();
}
private static void mergingAsync() {
Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
.forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){};
System.out.println(x + " " + Thread.currentThread().getId());});
}
// artificial representations of IO work
static Observable<Integer> getDataAsync(int i) {
return getDataSync(i).subscribeOn(Schedulers.io());
}
static Observable<Integer> getDataSync(int i) {
return Observable.create((Subscriber<? super Integer> s) -> {
// simulate latency
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
s.onNext(i);
s.onCompleted();
});
}
}
В приведенном выше примере мы используем метод subscribeOn наблюдаемого и предоставляем ThreadPool(Schedules.io) для выполнения, поэтому подписка для каждого наблюдаемого будет происходить в отдельном потоке.
Существует вероятность того, что Thread.sleep блокирует любой общий объект между потоками, но я до сих пор не понимаю этого. Пожалуйста помоги.