У меня проблемы с тем, как ReactiveCommand работает с ObserveOn и SubscribeOn.
У меня есть API, который возвращает наблюдаемую последовательность строк, и это выглядит так:
public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
{
return Observable.Create<string>(obs =>
{
for (int i = 0; i < numParagraphs; i++)
{
Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
obs.OnNext("Some String");
}
obs.OnCompleted();
return Disposable.Empty;
});
}
Я использую ReactiveCommand.CreateAsyncObservable для вызова этого, используя SubscribeOn(RxApp.TaskpoolScheduler) (чтобы убедиться, что Thread.Sleep не происходит в потоке пользовательского интерфейса) и ObserveOn(RxApp.MainThreadScheduler) для рисования строк в моем потоке пользовательского интерфейса.
К сожалению, все это выполняется синхронно (в одном потоке), и я не уверен, почему. Вот как выглядит код виртуальной машины:
DownloadDocument = ReactiveCommand
.CreateAsyncObservable(_ =>
{
Console.WriteLine("ViewModel Invoking On thread {0}", Thread.CurrentThread.ManagedThreadId);
return _documentService.GetDocumentObservable(NumParagraphs, 0);
});
DownloadDocument
.SubscribeOn(RxApp.TaskpoolScheduler)
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(p =>
{
Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
Document.Add(p);
},
x => { },
() => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
Все выполняется в одном потоке и блокирует поток пользовательского интерфейса. Если я вызову его «по старинке», все будет работать так, как ожидалось (как показано ниже):
Something = ReactiveCommand.Create();
Something.Subscribe(x =>
{
_documentService.GetDocumentObservable(NumParagraphs, 0)
.SubscribeOn(RxApp.TaskpoolScheduler)
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(p =>
{
Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
Document.Add(p);
},
ex => { },
() => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
});
Здесь нет заблокированных тем.
Есть ли что-то, чего мне не хватает, когда дело доходит до использования ReactiveCommands с Observable API?