Циклические зависимости между потоками в реактивном программировании

Занимаясь реактивным программированием, я часто сталкиваюсь с ситуациями, когда два потока зависят друг от друга. Каков идиоматический способ решения этих случаев?

Минимальный пример: есть кнопки A и B, обе отображают значение. Нажатие на A должно увеличить значение A на B. Нажатие на B должно установить значение B на A.

Первое решение, которое я смог придумать (пример на F#, но приветствуются ответы на любом языке):

let solution1 buttonA buttonB =
    let mutable lastA = 0
    let mutable lastB = 1
    let a = new Subject<_> ()
    let b = new Subject<_> ()
    (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) 
    (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB)
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonA)
    a.OnNext 0
    b.OnNext 1

Это решение использует изменяемое состояние и темы, оно не очень читабельно и не выглядит идиоматичным.

Второе решение, которое я пробовал, включает в себя создание метода, который связывает два зависимых потока вместе:

let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) =
    let bProxy = new ReplaySubject<_> () 
    let a = aGivenB bProxy
    let b = bGivenA a
    b.Subscribe(bProxy.OnNext)
    a, b

let solution2 buttonA buttonB =
    let aGivenB b =
        Observable.WithLatestFrom(OnClick buttonA, b, fun click bValue -> bValue)
                  .Scan(fun acc x -> acc + x)
                  .StartWith(0)
    let bGivenA a =
        Observable.Sample(a, OnClick buttonB)
                  .StartWith(1)
    let a, b = dependency aGivenB bGivenA
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonB)

Это кажется немного лучше, но поскольку в реактивной библиотеке нет такого метода, как dependency, я считаю, что существует более идиоматическое решение. Также легко ввести бесконечную рекурсию, используя второй подход.

Каков рекомендуемый подход к проблемам, связанным с циклической зависимостью между потоками, как в приведенном выше примере, в реактивном программировании?


person Steve    schedule 10.02.2017    source источник
comment
Возможно, проблема в изменяемых данных. Я думал, что реактивное программирование работает лучше всего, когда оно выполняется в функциональном стиле.   -  person duffymo    schedule 10.02.2017
comment
Функциональный подход без изменяемых данных был бы предпочтительнее, я просто еще не знаю, как это сделать в этом примере.   -  person Steve    schedule 10.02.2017


Ответы (3)


ИЗМЕНИТЬ:

Вот решение F #:

type DU = 
    | A 
    | B 

type State = { AValue : int; BValue : int }

let solution2 (aObservable:IObservable<_>, bObservable:IObservable<_>) = 

    let union = aObservable.Select(fun _ -> A).Merge(bObservable.Select(fun _ -> B))

    let result = union.Scan({AValue = 0; BValue = 1}, fun state du -> match du with
        | A -> { state with AValue = state.AValue + state.BValue }
        | B -> { state with BValue = state.AValue }
    )

    result

На самом деле F# — отличный язык для этого благодаря встроенным размеченным объединениям и записям. Вот ответ, написанный на С#, с пользовательским размеченным союзом; мой F# довольно ржавый.

Хитрость заключается в том, чтобы превратить ваши две наблюдаемые в одну наблюдаемую, используя размеченное объединение. Таким образом, в основном объединяя a и b в одну наблюдаемую размеченного союза:

a : *---*---*---**
b : -*-*--*---*---
du: ab-ba-b-a-b-aa

Как только это будет сделано, вы сможете реагировать на то, является ли элемент толчком «А» или толчком «Б».

Просто для подтверждения я предполагаю, что нет способа явно установить значение, встроенное в ButtonA/ButtonB. Если есть, эти изменения должны быть смоделированы как наблюдаемые, а также включены в размеченное объединение.

var a = new Subject<Unit>();
var b = new Subject<Unit>();
var observable = a.DiscriminatedUnion(b)
    .Scan(new State(0, 1), (state, du) => du.Unify(
        /* A clicked case */_ => new State(state.A + state.B, state.B), 
        /* B clicked case */_ => new State(state.A, state.A)
    )
);

observable.Subscribe(state => Console.WriteLine($"a = {state.A}, b = {state.B}"));
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);

Вот классы, на которые это опирается в C#. Большая часть этого легко переводится во встроенные типы F#.

public class State /*easily replaced with an F# record */
{
    public State(int a, int b)
    {
        A = a;
        B = b;
    }

    public int A { get; }
    public int B { get; }
}

/* easily replaced with built-in discriminated unions and pattern matching */
public static class DiscriminatedUnionExtensions
{
    public static IObservable<DiscriminatedUnionClass<T1, T2>> DiscriminatedUnion<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return Observable.Merge(
            a.Select(t1 => DiscriminatedUnionClass<T1, T2>.Create(t1)),
            b.Select(t2 => DiscriminatedUnionClass<T1, T2>.Create(t2))
        );
    }

    public static IObservable<TResult> Unify<T1, T2, TResult>(this IObservable<DiscriminatedUnionClass<T1, T2>> source,
        Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return source.Select(union => Unify(union, f1, f2));
    }

    public static TResult Unify<T1, T2, TResult>(this DiscriminatedUnionClass<T1, T2> union, Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return union.Item == 1
            ? f1(union.Item1)
            : f2(union.Item2)
        ;
    }
}

public class DiscriminatedUnionClass<T1, T2>
{
    private readonly T1 _t1;
    private readonly T2 _t2;
    private readonly int _item;
    private DiscriminatedUnionClass(T1 t1, T2 t2, int item)
    {
        _t1 = t1;
        _t2 = t2;
        _item = item;
    }

    public int Item
    {
        get { return _item; }
    }

    public T1 Item1
    {
        get { return _t1; }
    }

    public T2 Item2
    {
        get { return _t2; }
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T1 t1)
    {
        return new DiscriminatedUnionClass<T1, T2>(t1, default(T2), 1);
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T2 t2)
    {
        return new DiscriminatedUnionClass<T1, T2>(default(T1), t2, 2);
    }
}
person Shlomo    schedule 10.02.2017
comment
Этот подход выглядит чище, спасибо! Я заметил, что можно напрямую определить aObservable.Select(fun _ state -> { state with AValue = state.AValue + state.BValue })..., что упростит оператор Scan до .Scan({AValue=0; BValue=1}, fun state f -> f state) и устранит необходимость в DU. - person Steve; 11.02.2017
comment
Как удивительно умно. Спасибо, что поделился. #ThingsYouLearnFromFunctionalProgrammers - person Shlomo; 11.02.2017

Вот очень простое решение с использованием Gjallarhorn:

#r @"..\packages\Gjallarhorn\lib\portable-net45+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\Gjallarhorn.dll"

open Gjallarhorn

(*
    Clicking on A must increment the value of A by B. Clicking on B must set the value of B to A.
*)
let  a = Mutable.create 3
let b = Mutable.create 4

let clickA() = a.Value <- a.Value + b.Value
let clickB() = b.Value <- a.Value

let d1 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked A: " + x.ToString()) a
let d2 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked B: " + x.ToString()) b

clickA()
clickB()  

На самом деле он очень похож на ваш первоначальный, поэтому использует изменяемое состояние, но делает привязку к пользовательскому интерфейсу довольно простой, для более идиоматического использования см. in-wpf-2016-edition/" rel="nofollow noreferrer">сообщение в блоге.

person s952163    schedule 11.02.2017

Предполагая, что выходные данные в конечном итоге отправляются обратно в источник, вы можете сделать это с помощью основных операторов. Все, что вам нужно сделать, это дважды вызвать withLatestFrom для каждой наблюдаемой кнопки/сигнала. Мое решение на Java, но оно должно быть достаточно простым!

private static Pair<Observable<Integer>, Observable<Integer>> test(
    final Observable<Integer> aValues,
    final Observable<Integer> bValues,
    final Observable<Void> aButton,
    final Observable<Void> bButton,
    final Func2<Integer, Integer, Integer> aFunction,
    final Func2<Integer, Integer, Integer> bFunction
) {
    return new Pair<>(
        aButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, aFunction),
        bButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, bFunction)
    );
}

Вот тестовый код, который я использовал:

final TestScheduler scheduler = new TestScheduler();

final TestSubject<Integer> aSubject = TestSubject.create(scheduler);
final TestSubject<Integer> bSubject = TestSubject.create(scheduler);
aSubject.onNext(1);
bSubject.onNext(1);

final TestSubject<Void> aButton = TestSubject.create(scheduler);
final TestSubject<Void> bButton = TestSubject.create(scheduler);

final Pair<Observable<Integer>, Observable<Integer>> pair = test(
    aSubject, bSubject, aButton, bButton, (a, b) -> a + b, (a, b) -> a
);

pair.component1().subscribe(aSubject::onNext);
pair.component2().subscribe(bSubject::onNext);
pair.component1().map(a -> "A: " + a).subscribe(System.out::println);
pair.component2().map(b -> "B: " + b).subscribe(System.out::println);

aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();

Это печатает:

A: 2
B: 2
A: 4
A: 6
B: 6
person flakes    schedule 11.02.2017