Rx Extensions: как сделать подписку зависимой от другой подписки?

У меня есть класс, который принимает наблюдаемое в своем конструкторе, затем подписывается на него и делает некоторые вещи, устанавливает свойства и т. д. Сам класс является наблюдаемым.

Я хочу подписаться на мой источник, наблюдаемый только в том случае, если кто-то подписан на мой класс, но я не могу понять, как это сделать.

public MyClass : IObservable<MyResult>
{
    private readonly Subject<MyResult> _subject = new Subject<MyResult>();
    private readonly IConnectableObservable<MySource> _source;

    public MyClass(IObservable<MySource> source)
    {
         _source = source
             //All my logic to set properties and such
             //goes here as a side effect, instead of in a subscription...
             .Do(...)
             //I hope that by publishing, side effects will happen only once...
             .Publish();
    }

    public IDisposable Subscribe(IObserver<MyResult> observer)
    {
        return new CompositeDisposable(
             _source.Subscribe(/* 
                  don't have anything to do here,
                  just subscribing to make sure I'm subscribed to source...
                  (this can't be the right way to do it)
             */),
             _subject.Subscribe(observer));
    }
}

ОБНОВИТЬ

@Scott: я понимаю, почему реализация IObservable будет анти-шаблоном. My Class должен потреблять один наблюдаемый объект и предоставляет 3 в качестве свойств (первоначально наиболее часто используемый наблюдаемый объект должен был быть возвращен самим MyClass, но я думаю, что иметь его в качестве свойства может быть лучше.

То, что я пытаюсь написать, - это наблюдаемая ICommand. Я знаю, что некоторые из них существуют, но это скорее способ выучить Rx...

public class ObservableCommand<T> : ICommand
{
    private readonly ISubject<T> _executeRequests = new Subject<T>();
    private readonly ISubject<T> _canExecuteRequests = new Subject<T>();

    public IObservable<bool> CanExecuteChanges { get; private set; }
    public IObservable<T> CanExecuteRequests { get; private set; }
    public IObservable<T> ExecuteRequests { get; private set; }

    public ObservableCommand(IObservable<bool> canExecute)
    {
        var source = canExecute.DistinctUntilChanged()

        //How do I dispose of subscription later?
        //I have this fear that I'm going to have a chain of references, 
        //and my entire app will never get GC'd!
        var subscription = source.Subscribe(
            o => {
                if (CanExecuteChanged != null)
                    CanExecuteChanged(this, EventArgs.Empty);
            });

        CanExecuteChanges = source;

        CanExecuteRequests = _canExecuteRequests.AsObservable();

        ExecuteRequests = _executeRequests.AsObservable();
    }

    #region ICommand Members

    public bool  CanExecute(object parameter)
    {
        _canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    public event EventHandler  CanExecuteChanged;

    public void  Execute(object parameter)
    {
        _executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }

    #endregion
}

person Master Morality    schedule 16.07.2011    source источник
comment
Я знаю, что вы используете это как опыт обучения. Если вы застряли, вы можете проверить отличную версию ReactiveUI того же самого, чтобы получить некоторые подсказки. github.com/xpaulbettsx/ReactiveUI/blob/master/ReactiveUI. Xaml/   -  person Anderson Imes    schedule 19.07.2011
comment
@Anderson: На самом деле я искал ReactiveUI, но обнаружил, что недостаточно знаю о самом Rx, чтобы использовать его с комфортом, хотя выглядит как хорошая библиотека.   -  person Master Morality    schedule 20.07.2011
comment
ах да, это делает его жестким. Чтобы более детально изучить сами операторы, я использовал RxSandbox... очень помогло. Надеюсь, это поможет вам: mnajder.blogspot.com/2010/03/rxsandbox-v1 .html   -  person Anderson Imes    schedule 21.07.2011
comment
Кроме того... вы заметите, что ReactiveUI сразу подписывается на CanExecute, а не откладывает до тех пор, пока кто-то не подпишется на что-то еще. Работать таким образом довольно дешево, хотя я понимаю желание.   -  person Anderson Imes    schedule 21.07.2011


Ответы (2)


Как насчет не Doing или Publishing в конструкторе, а скорее в методе Subscribe?

Следует сказать, что явная реализация IObservable<T> является чем-то вроде анти-паттерна Rx.

Вы можете сделать подписки зависимыми от других подписчиков с помощью Defer и Create, что-то вроде

IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect =  Observable.Defer(() =>
{
   // Do something interesting on Subscription
   // ....
   return source;
});
person Scott Weinstein    schedule 16.07.2011
comment
Пример был бы чрезвычайно полезен. - person Master Morality; 17.07.2011
comment
Понятно - можете ли вы описать, чего вы пытаетесь достичь? - person Scott Weinstein; 17.07.2011
comment
Defer является статическим, а не методом расширения для IObservable. Его можно использовать только как Observable.Defer(). Create то же самое. - person Richard Szalay; 10.08.2011

Я подготовил для вас вырезку. MyClass реализует IObservable<T>, а также имеет методы IObserver<T>, но все они частные. С дополнительными OnInitialize и OnSubscribe вы сможете делать все, что захотите, в любом событии, на которое хотите отреагировать.

Если вы хотите сделать этот фрагмент повторно используемым, вы можете определить все методы как partial, так как все они возвращают void. Затем вы можете создать определение для всего, что хотите.

public class MyClass<T> : IObservable<T>
{
    private readonly IObservable<T> m_Source;

    public MyClass(IObservable<T> source)
    {
        if (source == null) throw new ArgumentNullException("source");
        m_Source = source.Do(OnNext, OnError, OnCompleted);
        OnInitialize();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        OnSubscribe();
        return m_Source.Subscribe(observer);
    }

    private void OnInitialize()
    {
        Console.WriteLine("OnInitialize");
    }
    private void OnSubscribe()
    {
        Console.WriteLine("OnSubscribe");
    }
    private void OnNext(T value)
    {
        Console.WriteLine("OnNext: {0}", value);
    }
    private void OnError(Exception error)
    {
        Console.WriteLine("OnError: {0}", error.Message);
    }
    private void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }    
}
person StanislawSwierc    schedule 17.07.2011