ReactiveCommand не обрабатывает наблюдаемый вызов, как ожидалось

У меня проблемы с тем, как ReactiveCommand работает с ObserveOn и SubscribeOn.

У меня есть API, который возвращает наблюдаемую последовательность строк, и это выглядит так:

 public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
 {
     return Observable.Create<string>(obs =>
     {
         for (int i = 0; i < numParagraphs; i++)
         {
             Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
             Thread.Sleep(1000);
             obs.OnNext("Some String");
         }
         obs.OnCompleted();
         return Disposable.Empty;
     });
 }

Я использую ReactiveCommand.CreateAsyncObservable для вызова этого, используя SubscribeOn(RxApp.TaskpoolScheduler) (чтобы убедиться, что Thread.Sleep не происходит в потоке пользовательского интерфейса) и ObserveOn(RxApp.MainThreadScheduler) для рисования строк в моем потоке пользовательского интерфейса.

К сожалению, все это выполняется синхронно (в одном потоке), и я не уверен, почему. Вот как выглядит код виртуальной машины:

DownloadDocument = ReactiveCommand
.CreateAsyncObservable(_ =>
{
    Console.WriteLine("ViewModel Invoking On thread {0}", Thread.CurrentThread.ManagedThreadId);
    return _documentService.GetDocumentObservable(NumParagraphs, 0);
});

DownloadDocument
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(p =>
    {
        Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
        Document.Add(p);
    },
    x => { },
    () => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });

Все выполняется в одном потоке и блокирует поток пользовательского интерфейса. Если я вызову его «по старинке», все будет работать так, как ожидалось (как показано ниже):

Something = ReactiveCommand.Create();
Something.Subscribe(x =>
{
    _documentService.GetDocumentObservable(NumParagraphs, 0)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(p =>
    {
        Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
        Document.Add(p);
    },
    ex => { },
    () => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
});

Здесь нет заблокированных тем.

Есть ли что-то, чего мне не хватает, когда дело доходит до использования ReactiveCommands с Observable API?


person enomam    schedule 06.03.2015    source источник
comment
Я заметил, что это чем-то похоже на этот пост: stackoverflow.com/questions/25223340/ . Там был применен обходной путь (путем явного вызова кода в другом потоке), а не обнаружение ошибки в ReactiveUI.   -  person enomam    schedule 06.03.2015


Ответы (2)


Сам ReactiveCommand подписывается на ваш источник при вызове команды, которая не подхватывает ваш SubscribeOn. Самый простой способ исправить это — просто обернуть код в Task.Run:

 return Observable.Create<string>(obs =>
 {
     bool stopEarly;

     Task.Run(() => {
         for (int i = 0; i < numParagraphs; i++)
         {
             Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
             Thread.Sleep(1000);
             obs.OnNext("Some String");
             if (stopEarly) return;
         }
         obs.OnCompleted();
     });

     return Disposable.Create(() => stopEarly = true);
 });
person Ana Betts    schedule 07.03.2015
comment
Добавление Task.Run — это то, чего я хотел избежать, учитывая, что SubscribeOn (в обычном сценарии Rx) все равно дал бы мне это. Кроме того, я должен был быть более ясным - API, который я предоставил, был только для экспериментов. Цель состояла в том, чтобы рассмотреть способы использования синхронного API наблюдаемым образом. - person enomam; 07.03.2015
comment
По сути, я не хочу, чтобы API решал, в каком потоке он работает, я хочу, чтобы это решал потребитель. - person enomam; 07.03.2015
comment
@enomam Тогда вам нужно использовать либо Observable.Start, либо Task.Run - person Ana Betts; 08.03.2015
comment
Это работает, но на самом деле мне не подходит. Observable.Start не предоставляет мне интерфейс IObserver (это означает, что я не могу передавать данные потребителю), а Task.Run по-прежнему будет обернут Observable. Возможно, мне нужно будет разделить потребление API на два отдельных домена — те, которые имеют дело с одним запросом — несколько результатов с течением времени, и более простой материал типа запрос/ответ (который можно красиво обернуть в ReactiveCommand). - person enomam; 10.03.2015
comment
... также (извините, что продолжаю беспокоить вас!), Disposable.Create lamba не вызывается до тех пор, пока задача не завершится (после obs.OnComplete) DownloadDocument.Subscribe(...) дает мне одноразовый - но утилизация что сразу не распоряжаться. - person enomam; 10.03.2015
comment
@enomam Верно. Вы могли бы привести аргумент, что если никто не слушает команду, мы должны убить все задачи в полете, но это также будет Удивительно для людей, слушающих результат ExecuteAsync. - person Ana Betts; 11.03.2015

Ваш метод GetDocumentObservable не предоставляет Rx для запуска кода в другом потоке, поэтому сразу во время подписки он запустит все значения и сигнализирует о завершении до завершения вызова .Subscribe(...).

Некоторые ключевые вещи, которых следует опасаться при написании кода, подобного этому, — это использование return Disposable.Empty; и Thread.Sleep(...);. Они должны быть для вас красными флажками.

Вместо этого вы всегда должны сначала пытаться реализовать свои методы, используя другие встроенные методы, и только тогда переходить к Create "самостоятельно", когда это необходимо.

К счастью, есть очень мощный встроенный оператор, который идеально подходит для этого — Generate. Этот метод очень полезен для создания последовательностей, включающих «сон».

Вот как это будет выглядеть:

public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
{
    return
        Observable
            .Generate(
                0,
                i => i < numParagraphs,
                i => i + 1,
                i => "Some String",
                i => i == 0
                    ? TimeSpan.Zero
                    : TimeSpan.FromSeconds(1.0))
            .Do(x => Console.WriteLine(
                "Service On thread {0}",
                Thread.CurrentThread.ManagedThreadId));
}

Это делает все, что делает ваш метод, и должен вести себя так, как вы этого хотели.

person Enigmativity    schedule 07.03.2015
comment
Спасибо за это. Я должен быть ясным - вещи Thread.Sleep и Disposable предназначены для экспериментов, чтобы смоделировать что-то, что занимает много времени, что может или не может передать результаты потребителю. Конечная цель состоит в том, чтобы использовать синхронный API в ReactiveManner, поэтому на самом деле это не генератор (хотя в некоторых случаях это может быть) - person enomam; 07.03.2015