Установите Observable для завершения при логическом условии

Я должен обеспечить завершение Observable. Это наблюдение за потоковыми данными, изменение которых не входит в мои обязанности. Потоковые данные могут вообще не тикать. Я столкнулся с использованием предметного члена, который я могу использовать для ручного завершения наблюдаемого.

private readonly Subject<bool> _mySubject = new Subject<bool>();

// relevant property that should be completed
public IObservable<myModel> StreamObservable { get; private set; }

IObservable<myModel> someDataObservable = ... ; // may not tick in scenario

// this doesn't work            
StreamObservable = someDataObservable
                .TakeUntil(_mySubject)
                .Finally(() => logger.LogInformation("completed!"));
_mySubject.OnNext(true);

// I tried Amb.. still not going into Finally(), also when someDataObservable doesn't tick
StreamObservable = Observable.Amb(_mySubject.SelectMany(value => Observable.Empty<myModel>()), someDataObservable)
                .Finally(() => logger.LogInformation("completed!"));
_mySubject.OnNext(true);

Как я могу завершить, предпочтительно используя Subject.OnNext() ?

Триггер темы - это просто идея. Я предпочитаю оставаться в мире наблюдения и преобразовывать последовательность в Observable.Empty(), например. когда вызывается такой обратный вызов:

private Task<bool> FinishStreamCallback()
{
    _mySubject.OnNext(true); // my first approach
    return Task.FromResult(true);
}

.. нацеливание с использованием OnCompleted()-подписок на соответствующие общедоступные наблюдаемые объекты в другом классе.


person deafjeff    schedule 26.06.2020    source источник


Ответы (1)


Использование такого предмета не заканчивается наблюдаемым. Это не похоже на источник наблюдаемого под названием OnCompleted. Использование TakeUntil приводит к удалению подписки наблюдаемого источника.

Это как этот код:

var source = new Subject<int>();
var subscription = source.Materialize().Subscribe(x => Console.WriteLine(x.Kind));
source.OnNext(1);
subscription.Dispose();

Это отображает только OnNext.

Если я запускаю это:

var source = Observable.Return(42);
var subscription = source.Materialize().Subscribe(x => Console.WriteLine(x.Kind));

Затем я получаю:

OnNext
OnCompleted

Вызов .Dispose(), явно или через TakeUntil, приводит к тому, что наблюдаемый источник останавливается там, где он есть. OnCompleted не производится.

Если вы хотите знать, когда наблюдаемая отписывается, попробуйте этот метод расширения:

public static IObservable<T> OnUnsubscribe<T>(this IObservable<T> source, Action unsubscribe) =>
    Observable
        .Create<T>(o =>
            new CompositeDisposable(
                source.Subscribe(o),
                Disposable.Create(unsubscribe)));
                

Попробуйте использовать его следующим образом:

var source = new Subject<int>();
var trigger = new Subject<Unit>();
var subscription =
    source
        .Materialize()
        .OnUnsubscribe(() => Console.WriteLine("Unsubscribed!"))
        .TakeUntil(trigger)
        .Subscribe(x => Console.WriteLine(x.Kind));
        
source.OnNext(1);
trigger.OnNext(Unit.Default);

Это дает мне:

OnNext
Unsubscribed!
person Enigmativity    schedule 29.06.2020
comment
хорошее объяснение. Но тогда мне пришлось бы использовать свойство типа bool unsubscribed в OnUnsusbscribe, оставив наблюдаемое удаленным. Я предпочитаю действительно модифицировать наблюдаемое до завершенного. Пожалуйста, смотрите мое редактирование выше. Спасибо. - person deafjeff; 29.06.2020
comment
@deafjeff - я не уверен, что понимаю, что вы подразумеваете под свойством типа bool unsubscribed в OnUnsusbscribe, оставляя наблюдаемое удаленным? - person Enigmativity; 29.06.2020
comment
OnSubscribed() работает и срабатывает. Но на самом деле я нахожусь в своего рода подписке на событие наблюдаемого, которое только что отметилось. Продолжая здесь, мне пришлось бы работать с новым общедоступным логическим свойством, указывающим другим классам, что что-то изменилось. Но Observale — это свойство, открытое для других классов, и часть более крупного конвейера. И мне нравится, когда он трансформируется (здесь: в OnCompleted()) - person deafjeff; 29.06.2020
comment
@deafjeff - Если ваш наблюдаемый полностью инкапсулирован, то, возможно, Subject<T> - это все, что вам нужно в классе - выставьте это как общедоступный наблюдаемый. Тогда у вас есть полный контроль. - person Enigmativity; 29.06.2020