доступ к IObservable внутри той же подписки IObservable

Вот простой пример того, что я пытаюсь сделать с помощью Reactive Extensions, но он не работает.

Добавить не работает в этом простом примере

    public static void Main(string[] args)
    {
        var list = new List<int> { 1, 2, 3 };
        var obs = list.ToObservable();
        IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Console.WriteLine(Add(obs).ToString());
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
        Console.ReadLine();
        subscription.Dispose();
    }

    private static int Add(IObservable<int> wholeList)
    {
        int sum = 0;
        wholeList.ForEach(i => sum = sum + i);
        return sum;
    }

Фактический результат

1
_

Желаемый результат

1
6
2
6
3
6
Sequence Completed
_

т. е. я хотел бы выполнять метод Add(obs) внутри каждой итерации, где obs сам является холодным IObservable, проходящим итерацию?


person Cel    schedule 10.10.2011    source источник
comment
Скорее всего, вы делаете это неправильно. Можете ли вы описать свой сценарий и то, чего вы пытаетесь достичь на более высоком уровне?   -  person Ana Betts    schedule 11.10.2011
comment
@Paul, пожалуйста, посмотрите комментарий, который я только что добавил в Enigmativity ..   -  person Cel    schedule 11.10.2011


Ответы (2)


Что касается вашего комментария, я бы посоветовал вам сделать наблюдаемое для создания элементов по мере необходимости и не делать этого после подписки. В вашем примере вы можете сделать что-то вроде:

var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));

obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
  Console.WriteLine(t.Item1);
  SaveItems(t.Item2);
});
person Ankur    schedule 12.10.2011

Измените это:

IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)

к этому:

IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)

Вы должны отметить, что вы делаете плохие вещи в отношении Rx. Вы входите и выходите из наблюдаемых. Вы должны избегать этого, когда это возможно.

Так, например, избегайте этого:

    var list = new List<int> { 1, 2, 3 };
    var obs = list.ToObservable();

когда это одно и то же:

    var obs = Observable.Range(1, 3);

Также весь метод static int Add(IObservable<int> wholeList) плохой. Он вызывает ForEach (что обычно должно быть предупреждением о том, что вы делаете что-то не так), чтобы извлечь значения из наблюдаемого. Здесь может произойти мертвая блокировка.

Уже существует наблюдаемое расширение под названием Sum, которое возвращает IObservble<int>, и это не выводит вас из наблюдаемого.

Итак, попробуйте написать свой код следующим образом:

var obs = Observable.Range(1, 3);

var query =
    from n in obs
    from s in obs.Sum()
    select new
    {
        Number = n.ToString(),
        Sum = s.ToString(),
    };

using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
    x =>
        {
            Console.WriteLine(x.Number);
            Console.WriteLine(x.Sum);
        },
    err =>
        Console.WriteLine("Error"),
    () =>
        Console.WriteLine("Sequence Completed")))
{
    Console.ReadLine();
}

Надеюсь, это поможет.

person Enigmativity    schedule 10.10.2011
comment
Это прекрасный ответ, и я принимаю ваши очки! К сожалению, мне нужно использовать .SubscribeOn(Scheduler.NewThread) по причинам, не показанным здесь, есть ли другой способ сделать MyMethod(obs) в конце каждой итерации? - person Cel; 11.10.2011
comment
@Cel - я изменил свой код примера, чтобы использовать SubscribeOn, а не ObserveOn, и он все еще работает. Это пример того, как пребывание в наблюдаемых работает при выходе за их тупиковые блокировки, поэтому вам следует избегать вызова вашего MyMethod(obs). - person Enigmativity; 11.10.2011
comment
я думаю, что я должен был опубликовать предполагаемый MyMethod(obs) в первую очередь, потому что Add(obs) на самом деле не очень хорошо моделирует сценарий. Желаемым MyMethod на самом деле является SaveMessages (obs), поэтому я хотел бы не суммировать целые числа, а в конце каждой отдельной итерации сообщения я хотел бы сохранить ВСЕ сообщения, содержащиеся в obs. и я не хочу сохранять все элементы в obs только в OnCompleted, мне нужно делать это в конце каждой итерации? - person Cel; 11.10.2011
comment
@Cel - Затем измените подпись на static IObservable<int> Add(IObservable<int> wholeList), а затем используйте стандартные операторы или Observable.Create<T>(...) внутри метода. - person Enigmativity; 11.10.2011