IObservable для получения результатов в бесконечном цикле

Это код, который я разработал до сих пор:

var observable = Observable.Create<string>(async observer =>
{
    var wc = new WebClient { UseDefaultCredentials = true };
    observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});

observable.Subscribe(
    res => Debug.WriteLine("got result: {0}", res), 
    exc => Debug.WriteLine("exception: {0}", exc.Message)
); 

Это правильно извлекает данные веб-сайта и запускает мой обратный вызов один раз. Я хочу иметь бесконечный цикл, который действует следующим образом: ожидание результата -> вызов OnNext -> ожидание n секунд -> повторение операции.

Создание Observable.Interval и SelectMany его в моем Observable не совсем подходит, потому что это будет запрашивать веб-сайт в течение фиксированных периодов времени. Я хочу, чтобы следующий вызов запускался только после успешного или неудачного предыдущего. Какой самый элегантный и лаконичный способ добиться этого?


person src091    schedule 20.12.2013    source источник


Ответы (1)


Не изменяя слишком много вашего кода, вы могли бы эффективно конкатировать задержку после выхода, а затем повторять все это до бесконечности.

var observable = Observable.Create<string>(async observer =>
{
    var wc = new WebClient { UseDefaultCredentials = true };
    observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});

observable
    .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
    .Repeat()
    .Subscribe(
      res => Debug.WriteLine("got result: {0}", res), 
      exc => Debug.WriteLine("exception: {0}", exc.Message)
    ); 

Однако есть лучшие способы сделать это, потому что в предыдущем случае вы создавали новый WebClient каждую секунду. Вместо этого вы могли бы сделать что-то вроде этого...

using System.Reactive.Threading.Tasks;

var observable = Observable.Using(() => new WebClient(), (client) =>
    client.DownloadStringTaskAsync("http://ya.ru")
        .ToObservable()
        .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
        .Repeat()
        );

И если вы хотите повторить ошибки, вы можете добавить Retry...

var observable = Observable.Using(() => new WebClient(), (client) =>
    client.DownloadStringTaskAsync("http://ya.ru")
        .ToObservable()
        .Retry(3)
        .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
        .Repeat()
        );
person cwharris    schedule 20.12.2013
comment
Эта версия не повторяется при ошибках. - person James World; 20.12.2013
comment
О, должно ли это повторяться при ошибках? Это не то, что делает оригинальный пример... - person cwharris; 20.12.2013
comment
Почему за это проголосовали? Он делает именно то, что запрашивает OP, и это довольно хорошая практика. - person cwharris; 20.12.2013
comment
@ChristopherHarris Да, так и должно быть, я просто не знал, как это реализовать :) Спасибо за подробный ответ. - person src091; 20.12.2013
comment
@Антон Абсолютно. Rx может быть сложным, но как только вы понимаете все операторы, это похоже на ohhhhhhhhh... - person cwharris; 20.12.2013
comment
Я проголосовал за него, когда он не повторял ошибки. Поскольку вы отредактировали его, я проголосовал против ... Но я не голосую за, потому что на самом деле это перефразирование материала, освещенного в предыдущих ответах ... Я слишком резок? - person James World; 20.12.2013
comment
Голоса против хороши для демократии, но неоднозначны по своей природе. - person cwharris; 20.12.2013