Удаление реактивной наблюдаемой подписки

Если у меня есть доступ к IObservable, который, как я знаю, когда-либо будет возвращать только один элемент, будет ли это работать и является ли это лучшим шаблоном использования?

IDisposable disposable = null;
disposable = myObservable.Subscribe(x =>
  {
     DoThingWithItem(x);
     if (disposable != null)
     {
       disposable.Dispose();
     }
  });

person Noob    schedule 09.10.2011    source источник
comment
ИМХО тот факт, что удаленный объект находится в области видимости, делает его плохим стилем. Я что-то упускаю?   -  person Miserable Variable    schedule 09.10.2011
comment
Под этим вы подразумеваете, что одноразовая переменная может быть удалена, если myObservable сработает до того, как одноразовая переменная выйдет из области видимости? Какой лучший шаблон для избавления от этого объекта?   -  person Noob    schedule 09.10.2011


Ответы (4)


Отказ от ответственности: я также все еще изучаю Rx. Так что я на самом деле не эксперт, но я считаю, что одноразовый товар, возвращенный Subscribe, только отменит подписку. Также, если источник завершается, как в вашем случае, отписка выполняется автоматически. Поэтому я думаю, что Dispose здесь избыточен и может быть безопасно удален.

См. ответ на этот вопрос для получения дополнительной информации.

person Ilian    schedule 09.10.2011

Одноразовый объект, возвращаемый методами расширения Subscribe, возвращается исключительно для того, чтобы позволить вам вручную отказаться от подписки на наблюдаемый объект до естественного завершения наблюдаемого.

Если наблюдаемое завершается — либо с OnCompleted, либо с OnError — тогда подписка уже готова для вас.

Попробуйте этот код:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

Если вы запустите приведенное выше, вы увидите, что «Disposed!» записывается в консоль, когда наблюдаемое завершается без необходимости вызова .Dispose() для подписки.

Следует отметить одну важную вещь: сборщик мусора никогда не вызывает .Dispose() для наблюдаемых подписок, поэтому вы должны удалять свои подписки, если они не завершились (или могли не закончиться) естественным образом до того, как ваша подписка выйдет из области действия.

Возьмите это, например:

var wc = new WebClient();

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);

var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));

Наблюдаемый объект ds будет присоединяться к обработчику событий только тогда, когда у него есть подписка, и будет отсоединяться только после завершения наблюдаемого объекта или удаления подписки. Поскольку это обработчик событий, наблюдаемый никогда не завершится, потому что он ожидает большего количества событий, и, следовательно, удаление — единственный способ отсоединиться от события (для приведенного выше примера).

Когда у вас есть наблюдаемая FromEventPattern, которая, как вы знаете, когда-либо вернет только одно значение, разумно добавить метод расширения .Take(1) перед подпиской, чтобы позволить обработчику событий автоматически отсоединяться, и тогда вам не нужно вручную удалять подписку.

Вот так:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);

Надеюсь, это поможет.

person Enigmativity    schedule 10.10.2011
comment
Спасибо! У меня точно такая же проблема (событие, из которого я хочу только одно), и Take(1) — идеальное решение. - person moswald; 09.01.2014
comment
@moswald Note FirstAsync() делает то же самое. - person lobsterism; 17.01.2014
comment
the garbage collector never calls .Dispose() on observable subscriptions - Это то, ради чего я здесь. Спасибо! - person Sebastian Graf; 22.08.2014
comment
@Sebastian - я просто хотел убедиться, что вы понимаете, что сборщик мусора никогда не вызывает .Dispose() для любого объекта - в наблюдаемых подписках нет ничего особенного. - person Enigmativity; 22.08.2014
comment
Да, но мне было интересно, есть ли в конкретной реализации финализатор, который вызывал бы Dispose(). - person Sebastian Graf; 22.08.2014
comment
@ Себастьян - это не так. Но также помните, что нет никакой гарантии, что финализатор когда-либо запустится. Вы должны убедиться, что вы явно вызываете .Dispose(), если это важно. - person Enigmativity; 22.08.2014
comment
Итак, нам нужно вызвать subscription.Dispose(), чтобы отсоединиться от события (я имею в виду наблюдаемое FromEventPattern)? - person zwcloud; 29.12.2018
comment
@zwcloud - Да, именно поэтому подписки возвращают IDisposable. Хотя .Take(1) автоматически отсоединяется от события после возврата первого значения. - person Enigmativity; 29.12.2018
comment
it is wise to add the .Take(1) extension method before subscribing to allow the event handler to automatically detach - Какая легенда, Спасибо! - person pejno; 16.11.2019
comment
Удивительный ответ, чувак. - person Electron; 01.02.2021

Функция Take сделает именно то, что вы ищете. В данном случае Take(1).

person Gideon Engelberth    schedule 09.10.2011
comment
То, что вы говорите, может быть доказано? - person J. Lennon; 08.12.2012

В отличие от некоторых комментариев, совсем не редкость избавиться от подписки изнутри OnNext.

Хотя верно, что удаление в OnCompleted и OnError выполняется за вас с помощью обернутой подписки, которую создает метод расширения Subscribe, вы может захотеть отказаться от подписки на основе значения, которое вы наблюдаете (например, в вашем случае: 1-е). У вас не всегда есть наблюдаемая, которая, как известно, производит только одно значение.

Проблема в том, что вы получаете IDisposable только после подписки. Наблюдаемый объект может перезвонить вам по OnNext еще до того, как он вернет вам IDisposable для отказа от подписки (в зависимости от таких вещей, как IScheduler, которые он использует).

System.Reactive.Disposables.SingleAssignmentDisposable пригодится в этом случае. Он упаковывает IDisposable, который вы можете назначить поздно, и немедленно утилизирует его при назначении, если SingleAssignmentDisposable к тому времени уже был удален. Также он содержит свойство IsDisposed, которое изначально равно false и устанавливается в true при вызове Dispose().

So:

IObservable<string> source = ...;

var subscription = new SingleAssignmentDisposable();
subscription.Disposable = source.Subscribe(x =>
{
    if (subscription.IsDisposed) // getting notified though I've told it to stop
        return;
    DoThingsWithItem(x);
    if (x == "the last item I'm interested in")
        subscription.Dispose();
});
person tinudu    schedule 26.10.2016