rxcpp — почему функция on_next всех наблюдателей не вызывается, когда наблюдаемое испускает значение

Я пытаюсь понять, как использовать rxcpp, у меня сложилось впечатление, что когда наблюдаемый объект испускает значение, все подписанные наблюдатели будут получать уведомления, вызывая свои методы on_next(), передавая им испускаемое значение.

Это не относится к следующему примеру:

auto eventloop = rxcpp::observe_on_event_loop();

printf("Start task\n");

auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
        [](int i){
            printf("Observable sending: %d\n", i);
            return i;
        }
);

values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#1 onNext: %d\n", v);},
        [](){printf("#1 onCompleted\n");});

values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#2 onNext: %d\n", v);},
        [](){printf("#2 onCompleted\n");});

printf("Finish task\n");

Я ожидал, что вывод будет примерно таким:

Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task

то есть on_next вызывается для всех подписанных наблюдателей, когда приходит новое значение.

Вместо этого вывод на самом деле:

Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task

person nbdy_    schedule 18.09.2016    source источник
comment
Использование as_blocking() предотвратит запуск второй подписки до тех пор, пока не завершится первая   -  person Kirk Shoop    schedule 26.09.2016


Ответы (1)


Это классическое поведение «горячее против холодного».

Горячая наблюдаемая будет делать то, что вы ожидаете. Interval — это холодная наблюдаемая величина, поэтому каждая подписка создает независимый набор значений.

Оператор публикации возьмет один холодный наблюдаемый объект и опубликует его как горячий наблюдаемый объект.

В этом случае было бы.

auto sharedvalues = values.publish().ref_count();

Затем используйте sharedvalues вместо values в выражениях подписки.

Поиск горячих и холодных наблюдаемых найдет широкое обсуждение этой темы.

person Kirk Shoop    schedule 25.09.2016