Как ограничить поток событий с помощью RX?

Я хочу эффективно регулировать поток событий, чтобы мой делегат вызывался при получении первого события, но не в течение 1 секунды, если получены последующие события. По истечении этого тайм-аута (1 секунда), если было получено последующее событие, я хочу, чтобы мой делегат был вызван.

Есть ли простой способ сделать это с помощью Reactive Extensions?

Образец кода:

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var builder = new StringBuilder();

    generator
        .Sample(TimeSpan.FromSeconds(1))
        .Finally(() => Console.WriteLine(builder.ToString()))
        .Subscribe(feed =>
                   builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",
                                                    feed.Value,
                                                    feed.Timestamp.ToString("mm:ss.fff"),
                                                    DateTime.Now.ToString("mm:ss.fff"))));

    Console.ReadKey();
}

Текущий выход:

Running...
Observed 064, generated at 41:43.602, observed at 41:43.602
Observed 100, generated at 41:44.165, observed at 41:44.602

Но я хочу наблюдать (временные метки, очевидно, изменятся)

Running...
Observed 001, generated at 41:43.602, observed at 41:43.602
....
Observed 100, generated at 41:44.165, observed at 41:44.602

person Alex    schedule 09.07.2010    source источник
comment
Это просто крутой оператор лямбда x => x <= 100 ;)   -  person Oliver    schedule 09.07.2010


Ответы (8)


Вот что я получил с помощью форума RX:

Идея состоит в том, чтобы выпустить серию «билетов» для запуска исходной последовательности. Эти «билеты» задерживаются на время ожидания, за исключением самого первого, который сразу добавляется к последовательности билетов. Когда приходит событие и есть ожидающий билет, событие срабатывает немедленно, в противном случае оно ждет до билета, а затем срабатывает. Когда он срабатывает, выдается следующий билет и так далее...

Чтобы объединить билеты и оригинальные события, нам нужен комбинатор. К сожалению, «стандартный» .CombineLatest здесь нельзя использовать, потому что он будет срабатывать для билетов и событий, которые использовались ранее. Поэтому мне пришлось создать свой собственный комбинатор, который в основном представляет собой отфильтрованный .CombineLatest, который срабатывает только тогда, когда оба элемента в комбинации "свежие" - никогда раньше не возвращались. Я называю это .CombineVeryLatest или .BrokenZip;)

Используя .CombineVeryLatest, вышеуказанную идею можно реализовать следующим образом:

    public static IObservable<T> SampleResponsive<T>(
        this IObservable<T> source, TimeSpan delay)
    {
        return source.Publish(src =>
        {
            var fire = new Subject<T>();

            var whenCanFire = fire
                .Select(u => new Unit())
                .Delay(delay)
                .StartWith(new Unit());

            var subscription = src
                .CombineVeryLatest(whenCanFire, (x, flag) => x)
                .Subscribe(fire);

            return fire.Finally(subscription.Dispose);
        });
    }

    public static IObservable<TResult> CombineVeryLatest
        <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
        IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
    {
        var ls = leftSource.Select(x => new Used<TLeft>(x));
        var rs = rightSource.Select(x => new Used<TRight>(x));
        var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });
        var fltCmb = cmb
            .Where(a => !(a.x.IsUsed || a.y.IsUsed))
            .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });
        return fltCmb.Select(a => selector(a.x.Value, a.y.Value));
    }

    private class Used<T>
    {
        internal T Value { get; private set; }
        internal bool IsUsed { get; set; }

        internal Used(T value)
        {
            Value = value;
        }
    }

Редактировать: вот еще один более компактный вариант CombineVeryLatest, предложенный Андреасом Кёпфом на форуме:

public static IObservable<TResult> CombineVeryLatest
  <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,
  IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)
{
    return Observable.Defer(() =>
    {
        int l = -1, r = -1;
        return Observable.CombineLatest(
            leftSource.Select(Tuple.Create<TLeft, int>),
            rightSource.Select(Tuple.Create<TRight, int>),
                (x, y) => new { x, y })
            .Where(t => t.x.Item2 != l && t.y.Item2 != r)
            .Do(t => { l = t.x.Item2; r = t.y.Item2; })
            .Select(t => selector(t.x.Item1, t.y.Item1));
    });
}
person Sergey Aldoukhov    schedule 11.07.2010

Хорошо,

у вас есть 3 сценария здесь:

1) Я хотел бы получать одно значение потока событий каждую секунду. означает: если он производит больше событий в секунду, вы получите всегда больший буфер.

observableStream.Throttle(timeSpan)

2) Я хотел бы получить последнее событие, которое было создано до того, как произошло второе, означает: другие события отбрасываются.

observableStream.Sample(TimeSpan.FromSeconds(1))

3) вы хотите получить все события, произошедшие за последнюю секунду. и это каждую секунду

observableStream.BufferWithTime(timeSpan)

4) вы хотите выбрать, что происходит между вторым со всеми значениями, пока не пройдет второе, и ваш результат будет возвращен

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
person cRichter    schedule 09.07.2010
comment
Черт, сценарий 2 - это именно то, что я ищу, и я тоже не смог найти метод :( - person deadlydog; 23.10.2012
comment
Если кому надо: stream.Sample(TimeSpan.FromSeconds(1)) - person AlexFoxGill; 24.06.2013

Вчера вечером я боролся с этой же проблемой и считаю, что нашел более элегантное (или, по крайней мере, более короткое) решение:

var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1));
var throttledSource = source.Take(1).Concat(delay).Repeat();
person LiquidAsh    schedule 29.04.2011

Это то, что я разместил в качестве ответа на этот вопрос в форум Rx:

ОБНОВЛЕНИЕ. Вот новая версия, которая больше не задерживает пересылку событий, когда события происходят с разницей во времени более одной секунды:

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval)
{
    return Observable.CreateWithDisposable<T>(o =>
    {
        object gate = new Object();
        Notification<T> last = null, lastNonTerminal = null;
        DateTime referenceTime = DateTime.UtcNow - minInterval;
        var delayedReplay = new MutableDisposable();
        return new CompositeDisposable(source.Materialize().Subscribe(x =>
        {
            lock (gate)
            {
                var elapsed = DateTime.UtcNow - referenceTime;
                if (elapsed >= minInterval && delayedReplay.Disposable == null)
                {
                    referenceTime = DateTime.UtcNow;
                    x.Accept(o);
                }
                else
                {
                    if (x.Kind == NotificationKind.OnNext)
                        lastNonTerminal = x;
                    last = x;
                    if (delayedReplay.Disposable == null)
                    {
                        delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() =>
                        {
                            lock (gate)
                            {
                                referenceTime = DateTime.UtcNow;
                                if (lastNonTerminal != null && lastNonTerminal != last)
                                    lastNonTerminal.Accept(o);
                                last.Accept(o);
                                last = lastNonTerminal = null;
                                delayedReplay.Disposable = null;
                            }
                        }, minInterval - elapsed);
                    }
                }
            }
        }), delayedReplay);
    });
}

Это была моя предыдущая попытка:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
    .Timestamp();

source.Publish(o =>
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1)))
).Run(x => Console.WriteLine(x));
person blueling    schedule 09.07.2010
comment
Только что нашел ссылку на этот ответ в кредитах приложения Microsoft Todo UWP :) - person CalvT; 19.09.2017
comment
@CalvT Ага! Я ожидал увидеть список проектов с открытым исходным кодом, но вот ответ на стек! Возможно, это связано с их переходом с AWS на Azure: theverge.com/2018/3/21/17146308/. - person Cosmos Gu; 03.07.2018

Хорошо, вот одно решение. Мне это особенно не нравится, но... да ладно.

Советую Джону за то, что он указал мне на SkipWhile, и cRichter за BufferWithTime. Спасибо, парни.

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1));

    var action = new Action<Timestamped<int>>(
        feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}",
                                  feed.Value,
                                  feed.Timestamp.ToString("mm:ss.fff"),
                                  DateTime.Now.ToString("mm:ss.fff")));

    var reactImmediately = true;
    bufferedAtOneSec.Subscribe(list =>
                                   {
                                       if (list.Count == 0)
                                       {
                                           reactImmediately = true;
                                       }
                                       else
                                       {
                                           action(list.Last());
                                       }
                                   });
    generator
        .SkipWhile(item => reactImmediately == false)
        .Subscribe(feed =>
                       {
                           if(reactImmediately)
                           {
                               reactImmediately = false;
                               action(feed);
                           }
                       });

    Console.ReadKey();
}
person Alex    schedule 09.07.2010

Вы пробовали метод расширения Throttle?

Из документов:

Игнорирует значения из наблюдаемой последовательности, за которыми следует другое значение до dueTime.

Мне не совсем ясно, будет ли это делать то, что вы хотите, или нет - в том смысле, что вы хотите игнорировать следующие значения, а не первое значение... но я бы ожидал, что это будет то, что вы хочу. Попробуйте :)

РЕДАКТИРОВАТЬ: Хммм... нет, я не думаю, что Throttle это правильно, в конце концов. Кажется, я вижу, что вы хотите сделать, но я не вижу ничего в структуре, чтобы сделать это. Хотя вполне может быть, что я что-то упустил. Вы спрашивали на форуме Rx? Вполне может быть, что если его сейчас нет, то с удовольствием добавят :)

Я подозреваю, что вы могли бы сделать это как-то хитро с SkipUntil и SelectMany... но я думаю, что это должно быть в своем собственном методе.

person Jon Skeet    schedule 09.07.2010
comment
Спасибо, Джон. Я попробовал, но это не совсем то, что я хочу. В примере использование Throttle приводит к игнорированию всех событий, кроме последнего. Мне нужно отреагировать на первое событие (чтобы обеспечить отзывчивость системы), но затем задержать с частотой дискретизации в 1 секунду для последующих событий. - person Alex; 09.07.2010
comment
Джон, вы предложили спросить на форуме Rx. На самом деле существует открытая проблема № 395 — Реализуйте настоящий дроссель..., запрашивающий это точное поведение. - person Magnus; 10.09.2019
comment
@Magnus: Ну, теперь есть - подано через 7 лет после публикации этого ответа! - person Jon Skeet; 10.09.2019
comment
???? - ой, кажется, я не заметил '10 после даты. Что ж, ссылка на проблему здесь, чтобы другие могли ее найти. - person Magnus; 10.09.2019

То, что вы ищете, это CombineLatest.

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource,
    IObservable<TRight> rightSource,
    Func<TLeft, TRight, TResult> selector
)

который объединяет 2 наблюдаемых и возвращает все значения, когда селектор (время) имеет значение.

редактировать: Джон прав, возможно, это не лучшее решение

person cRichter    schedule 09.07.2010
comment
Я не понимаю, как это то, что он ищет - что вы видите здесь как две наблюдаемые? - person Jon Skeet; 09.07.2010
comment
Один из них - это события, которые он генерирует, селектор - это Observable.Interval(TimeSpan.FromSeconds(1)) - person cRichter; 09.07.2010
comment
Разве это не будет генерировать событие каждый раз, когда любой из них создает значение? - person Jon Skeet; 09.07.2010
comment
Делая это... var interval = Observable.Interval(TimeSpan.FromSeconds(1)).Start(); генератор.CombineLatest(interval, (value, gate) =› value ) ... не дал результатов. Я не знаю, как бы я использовал CombineLatest для достижения этой цели. - person Alex; 09.07.2010
comment
Верно, но вы можете выбрать, что должно произойти тогда. например когда это из потока 1, затем кешируйте его и возвращайте ноль, если из потока 2, верните кеш. единственное, что вам нужно сделать после этого, это ограничить результаты, чтобы не возвращались нулевые значения. (в основном так реализован BufferWithTime ;-) - person cRichter; 09.07.2010

Вдохновленный ответом Bluelings, я привожу здесь версию, которая компилируется с Reactive Extensions 2.2.5.

Эта конкретная версия подсчитывает количество выборок, а также предоставляет последнее выбранное значение. Для этого используется следующий класс:

class Sample<T> {

  public Sample(T lastValue, Int32 count) {
    LastValue = lastValue;
    Count = count;
  }

  public T LastValue { get; private set; }

  public Int32 Count { get; private set; }

}

Вот оператор:

public static IObservable<Sample<T>> SampleResponsive<T>(this IObservable<T> source, TimeSpan interval, IScheduler scheduler = null) {
  if (source == null)
    throw new ArgumentNullException(nameof(source));
  return Observable.Create<Sample<T>>(
    observer => {
      var gate = new Object();
      var lastSampleValue = default(T);
      var lastSampleTime = default(DateTime);
      var sampleCount = 0;
      var scheduledTask = new SerialDisposable();
      return new CompositeDisposable(
        source.Subscribe(
          value => {
            lock (gate) {
              var now = DateTime.UtcNow;
              var elapsed = now - lastSampleTime;
              if (elapsed >= interval) {
                observer.OnNext(new Sample<T>(value, 1));
                lastSampleValue = value;
                lastSampleTime = now;
                sampleCount = 0;
              }
              else {
                if (scheduledTask.Disposable == null) {
                  scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule(
                    interval - elapsed,
                    () => {
                      lock (gate) {
                        if (sampleCount > 0) {
                          lastSampleTime = DateTime.UtcNow;
                          observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
                          sampleCount = 0;
                        }
                        scheduledTask.Disposable = null;
                      }
                    }
                  );
                }
                lastSampleValue = value;
                sampleCount += 1;
              }
            }
          },
          error => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnError(error);
          },
          () => {
            if (sampleCount > 0)
              observer.OnNext(new Sample<T>(lastSampleValue, sampleCount));
            observer.OnCompleted();
          }
        ),
        scheduledTask
      );
    }
  );
}
person Martin Liversage    schedule 27.03.2017