Я пытаюсь понять, как использовать rxcpp, у меня сложилось впечатление, что когда наблюдаемый объект испускает значение, все подписанные наблюдатели будут получать уведомления, вызывая свои методы on_next(), передавая им испускаемое значение.
Это не относится к следующему примеру:
auto eventloop = rxcpp::observe_on_event_loop();
printf("Start task\n");
auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
[](int i){
printf("Observable sending: %d\n", i);
return i;
}
);
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#1 onNext: %d\n", v);},
[](){printf("#1 onCompleted\n");});
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#2 onNext: %d\n", v);},
[](){printf("#2 onCompleted\n");});
printf("Finish task\n");
Я ожидал, что вывод будет примерно таким:
Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task
то есть on_next вызывается для всех подписанных наблюдателей, когда приходит новое значение.
Вместо этого вывод на самом деле:
Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task