Обходные пути для сбоев RX?

Я экспериментирую с Reactive Extensions на разных платформах, и одна вещь, которая меня немного раздражает, — это глюки.

Хотя для кода пользовательского интерфейса эти сбои могут не вызывать таких проблем, и обычно можно найти оператора, который их обходит, мне все еще сложнее отлаживать код при наличии сбоев: промежуточные результаты не важны для отладки, но мой разум не знает, когда результат промежуточный или «окончательный».

Поработав немного с чистым функциональным FRP в Haskell и синхронными системами потока данных, он также «чувствует» себя неправильно, но это, конечно, субъективно.

Но при подключении RX к исполнительным механизмам без пользовательского интерфейса (например, двигателям или переключателям), я думаю, сбои становятся более проблематичными. Как убедиться, что на внешние приводы отправляется только правильное значение?

Может быть, это может быть решено каким-то «диспетчером», который знает, когда какой-то «внешний датчик» запустил инициирующее событие, так что все внутренние события обрабатываются перед отправкой окончательных результатов на исполнительные механизмы. Примерно так, как описано в бумаге по flapjax.

Вопросы, на которые я надеюсь получить ответы:

  1. Есть ли что-то в RX, что делает невозможным исправление сбоев для синхронных уведомлений?
  2. Если нет, существует ли библиотека (желательно производственного качества) или подход для RX, который исправляет синхронные сбои? Особенно для однопоточного Javascript это может иметь смысл?
  3. Если общего решения не существует, как можно использовать RX для управления внешними датчиками/исполнительными механизмами без сбоев в исполнительных механизмах?

Позвольте мне привести пример

Предположим, я хочу напечатать последовательность кортежей (a,b), где контракт

a=n    b=10 * floor(n/10)

n - поток натуральных чисел = 0,1,2....

Поэтому я ожидаю следующую последовательность

(a=0, b=0)
(a=1, b=0)
(a=2, b=0)
...
(a=9, b=0)
(a=10, b=10)
(a=11, b=10)
...

В RX, чтобы сделать вещи более интересными, я буду использовать фильтр для вычисления потока b.

var n = Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Publish()
        .RefCount();

var a = n.Select(t => "a=" + t);

var b = n.Where(t => t % 10 == 0)
        .Select(t => "b=" + t);

var ab = a.CombineLatest(b, Tuple.Create);

ab.Subscribe(Console.WriteLine);

Это дает то, что я считал сбоем (временное нарушение инварианта/контракта):

(a=0, b=0)
(a=1, b=0)
(a=2, b=0)
...
(a=10, b=0) <-- glitch?
(a=10, b=10)
(a=11, b=10)

Я понимаю, что это правильное поведение CombineLatest, но я также думал, что это называется сбоем, потому что в реальной чистой системе FRP вы не получаете этих промежуточных результатов, нарушающих инвариант.

Обратите внимание, что в этом примере я бы не смог использовать Zip, а также WithLatestFrom дал бы неверный результат.

Конечно, я мог бы просто упростить этот пример до одного монадического вычисления, никогда не выполняя многоадресную рассылку n вхождений потока (это означало бы, что я не могу фильтровать, а только отображать), но это не главное: IMO в RX вы всегда получаете «глюк». ' всякий раз, когда вы разделяете наблюдаемый поток и снова присоединяетесь к нему:

    s
   / \
  a   b
   \ /
    t

Например, во FlapJAX таких проблем нет.

Есть ли в этом смысл?

Большое спасибо, Питер


person Ziriax    schedule 17.02.2016    source источник
comment
Нет никакого сбоя. Пример, который вы связали с отображением фактического вывода a0-----(b0b1)(c1c4)(d4d9)(e9e16) по сравнению с желаемым выводом a0-----b1----c4----d9----e16---, неверен. Фактический результат правильный для этого запроса. Это не сбой - это правильное функционирование этого запроса. Вам действительно нужно создать минимальный воспроизводимый пример, показывающий фактические сбои, чтобы мы могли вам ответить.   -  person Enigmativity    schedule 18.02.2016
comment
Моя вина, у меня сложилось впечатление, что CombineLatest всегда вызывает сбои, как только любой из его входных данных поступает из одного и того же источника.   -  person Ziriax    schedule 18.02.2016
comment
Нисколько. Вы должны ожидать, что CombineLatest будет работать детерминированным образом. Невозможно иметь дело с мгновенными изменениями значений. Источники должны создавать значения последовательно — один подписчик должен получить значение первым, а другой — вторым, поэтому, если два значения теоретически предназначены для изменения в одно и то же время, они не меняются сделать это в вычислительной реальности. Просто не может быть по-другому. Это особенно верно, поскольку каждый наблюдаемый источник и оператор являются черными ящиками. Нет никакого способа заглянуть внутрь и увидеть, что он теоретически должен делать.   -  person Enigmativity    schedule 18.02.2016
comment
Я думаю, что это основное различие между оригинальным FRP Haskell и RX. В оригинальной FRP у вас была концепция времени и одновременных событий. Например, если бы мы применили функцию (x, y) => (x-y) к сигналу кортежей (z, z), мы всегда получили бы 0. При использовании CombineLatest RX вы иногда получали бы 0, иногда 1. Я знал, что CombineLatest работал таким образом, я просто сбился с толку. Спасибо.   -  person Ziriax    schedule 18.02.2016
comment
Вот почему я бы сказал, что RX - менее полезный вариант собственно FRP. (Отказ от ответственности: я автор библиотеки FRP реактивного банана в Haskell)   -  person Heinrich Apfelmus    schedule 18.02.2016


Ответы (1)


Обновление: позвольте мне попытаться ответить на мой собственный вопрос в контексте RX.

Во-первых, кажется, мое понимание того, что такое «глюк», было неправильным. С точки зрения чистого FRP то, что мне казалось сбоями в RX, на самом деле кажется правильным поведением в RX.

Поэтому я предполагаю, что в RX нам нужно четко указать «время», в которое мы ожидаем активировать значения, объединенные с датчиков.

В моем же примере приводом является консоль, а датчиком интервал n.

Итак, если я изменю свой код

ab.Subscribe(Console.WriteLine);

в

ab.Sample(n).Subscribe(Console.WriteLine);

то печатаются только "правильные" значения.

Это означает, что когда мы получаем наблюдаемую последовательность, которая объединяет значения от датчиков, мы должны знать все исходные датчики, объединять их все и пробовать значения с этим объединенным сигналом, прежде чем отправлять какие-либо значения на исполнительные механизмы...

Таким образом, альтернативным подходом было бы «поднять» IObservable в структуру «Sensed», которая запоминает и объединяет исходные датчики, например, так:

public struct Sensed<T>
{
    public IObservable<T> Values;
    public IObservable<Unit> Sensors;

    public Sensed(IObservable<T> values, IObservable<Unit> sensors)
    {
        Values = values;
        Sensors = sensors;
    }

    public IObservable<Unit> MergeSensors(IObservable<Unit> sensors)
    {
        return sensors == Sensors ? Sensors : Sensors.Merge(sensors);
    }

    public IObservable<T> MergeValues(IObservable<T> values)
    {
        return values == Values ? Values : Values.Merge(values);
    }
}

И тогда мы должны перевести весь метод RX в эту структуру «Sensed»:

public static class Sensed
{
    public static Sensed<T> Sensor<T>(this IObservable<T> source)
    {
        var hotSource = source.Publish().RefCount();
        return new Sensed<T>(hotSource, hotSource.Select(_ => Unit.Default));
    }

    public static Sensed<long> Interval(TimeSpan period)
    {
        return Observable.Interval(period).Sensor();
    }

    public static Sensed<TOut> Lift<TIn, TOut>(this Sensed<TIn> source, Func<IObservable<TIn>, IObservable<TOut>> lifter)
    {
        return new Sensed<TOut>(lifter(source.Values), source.Sensors);
    }

    public static Sensed<TOut> Select<TIn, TOut>(this Sensed<TIn> source, Func<TIn, TOut> func)
    {
        return source.Lift(values => values.Select(func));
    }

    public static Sensed<T> Where<T>(this Sensed<T> source, Func<T, bool> func)
    {
        return source.Lift(values => values.Where(func));
    }

    public static Sensed<T> Merge<T>(this Sensed<T> source1, Sensed<T> source2)
    {
        return new Sensed<T>(source1.MergeValues(source2.Values), source1.MergeSensors(source2.Sensors));
    }

    public static Sensed<TOut> CombineLatest<TIn1, TIn2, TOut>(this Sensed<TIn1> source1, Sensed<TIn2> source2, Func<TIn1, TIn2, TOut> func)
    {
        return new Sensed<TOut>(source1.Values.CombineLatest(source2.Values, func), source1.MergeSensors(source2.Sensors));
    }

    public static IDisposable Actuate<T>(this Sensed<T> source, Action<T> next) 
    {
        return source.Values.Sample(source.Sensors).Subscribe(next);
    }
}

Тогда мой пример становится:

var n = Sensed.Interval(TimeSpan.FromMilliseconds(100));
var a = n.Select(t => "a=" + t);
var b = n.Where(t => t % 10 == 0).Select(t => "b=" + t);
var ab = a.CombineLatest(b, Tuple.Create);
ab.Actuate(Console.WriteLine);

И снова на исполнительный механизм передаются только «желаемые» значения, но в этой конструкции исходные датчики запоминаются в структуре Sensed.

Я не уверен, что все это имеет «смысл» (каламбур), может быть, мне следует просто отпустить свое желание получить чистый FRP и жить с этим. Ведь время относительно ;-)

Питер

person Ziriax    schedule 18.02.2016