Таймер самоотмены ReactiveX

Я хочу создать метод расширения формы:

IObservable<bool> CancellableTimer( this IObservable source, TimeSpan delay )
{
...
}

который создает последовательность, которая всегда ложна, когда исходная последовательность является истинной, но становится истинной, когда исходная последовательность остается истинной в течение периода, определяемого задержкой t:

source: 0---1---------0--1-0-1-0-1-0-1----------0
            t------>                 t------> 
result: 0----------1--0---------------------1---0

Я уверен, что должен быть способ сделать это с помощью примитивов Rx, но я новичок в Rx, и у меня проблемы с этим. Любые идеи, пожалуйста?


person bighairdave    schedule 22.08.2016    source источник


Ответы (1)


Итак, вот что я придумал. Я также переименовал метод в AsymetricDelay(), так как это кажется более подходящим именем:

    static public IObservable<bool> AsymetricDelay(this IObservable<bool> source, TimeSpan delay, IScheduler scheduler)
    {
        var distinct = source.DistinctUntilChanged();
        return distinct.
            Throttle(delay, scheduler) // Delay both trues and falses
            .Where(x => x)             // But we only want trues to be delayed
            .Merge(                   // Merge the trues with...
                distinct.Where(x=>!x) // non delayed falses
            )
            .DistinctUntilChanged(); // Get rid of any repeated values

    }

А вот модульный тест для подтверждения его работы:

    [Fact]
    public static void Test_AsymetricDelay()
    {
        var scheduler = new TestScheduler();

        var xs = scheduler.CreateHotObservable(
            new Recorded<Notification<bool>>(10000000, Notification.CreateOnNext(true)),
            new Recorded<Notification<bool>>(60000000, Notification.CreateOnNext(false)),
            new Recorded<Notification<bool>>(70000000, Notification.CreateOnNext(true)),
            new Recorded<Notification<bool>>(80000000, Notification.CreateOnNext(false)),
            new Recorded<Notification<bool>>(100000000, Notification.CreateOnCompleted<bool>())
        );

        var dest = xs.DelayOn( TimeSpan.FromSeconds(2), scheduler);

        var testObserver = scheduler.Start(
            () => dest,            
            0,
            0,
            TimeSpan.FromSeconds(10).Ticks);


        testObserver.Messages.AssertEqual(
            new Recorded<Notification<bool>>(30000000, Notification.CreateOnNext(true)),
            new Recorded<Notification<bool>>(60000000, Notification.CreateOnNext(false)),
            new Recorded<Notification<bool>>(100000000, Notification.CreateOnCompleted<bool>())

        );
    }
person bighairdave    schedule 23.08.2016