Вот простой пример того, что я пытаюсь сделать с помощью Reactive Extensions, но он не работает.
Добавить не работает в этом простом примере
public static void Main(string[] args)
{
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable();
IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Console.WriteLine(Add(obs).ToString());
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
Console.ReadLine();
subscription.Dispose();
}
private static int Add(IObservable<int> wholeList)
{
int sum = 0;
wholeList.ForEach(i => sum = sum + i);
return sum;
}
Фактический результат
1
_
Желаемый результат
1
6
2
6
3
6
Sequence Completed
_
т. е. я хотел бы выполнять метод Add(obs) внутри каждой итерации, где obs сам является холодным IObservable, проходящим итерацию?