Оператор Rx, который начинается как combLatest, но затем действует как withLatestFrom

Я ищу оператор, который объединяет два Observables, не генерируя ничего, пока оба Observables не испустят элемент (аналогично combineLatest), но затем излучает элементы только путем объединения элементов из одного Observable с последним излучаемым элементом другого Observable (аналогично к withLatestFrom). Результаты будут выглядеть так (y наблюдаемое - это «контроль»):

введите здесь описание изображения

Есть ли такой оператор?


person Scott H    schedule 03.02.2017    source источник


Ответы (2)


Я решил эту проблему на Java, но та же теория должна работать и для вас.

На самом деле у вас есть два основных паттерна; Значение combineLatest, за которым следуют withLatestFrom значения. Если withLatestFrom запускается первым, вы хотите пропустить значение combineLatest.

Начнем с того, что сделаем withLatestFrom наблюдаемым:

Observable<Result> wlf = o1.withLatestFrom(o2, f::apply);

Затем мы хотим создать наблюдаемый combineLatest, который испускает единственное значение. Мы также хотим остановить эту наблюдаемую, когда срабатывает wlf:

Observable<Result> cl = Observable.combineLatest(o1, o2, f::apply)
    .take(1).takeUntil(wlf);

Наконец, сложите эти два наблюдаемых вместе ... Для удобства я сделал вспомогательный метод для приема любых двух наблюдаемых и двухфункциональный оператор:

public static <Result,
    Param1, Source1 extends Param1,
    Param2, Source2 extends Param2>
Observable<Result> combineThenLatestFrom(
    final Observable<Source1> o1,
    final Observable<Source2> o2,
    final BiFunction<Param1, Param2, Result> f
) {
    final Observable<Result> base = o1
        .withLatestFrom(o2, f::apply);
    return Observable
        .combineLatest(o1, o2, f::apply)
        .take(1).takeUntil(base)
        .mergeWith(base);
}

А вот тестовый код, который я использовал для проверки метода:

public static void main(final String[] args) {
    final TestScheduler scheduler = new TestScheduler();
    final TestSubject<String> o1 = TestSubject.create(scheduler);
    final TestSubject<String> o2 = TestSubject.create(scheduler);
    final Observable<String> r = combineThenLatestFrom(o1, o2, (a, b) -> a + b);
    r.subscribe(System.out::println);
    o1.onNext("1");
    o1.onNext("2");
    o2.onNext("A");
    o2.onNext("B");
    o2.onNext("C");
    o2.onNext("D");
    o1.onNext("3");
    o2.onNext("E");
    scheduler.triggerActions();
}

Какие выходы:

2A
3D
person flakes    schedule 03.02.2017
comment
Благодарность! вместо того, чтобы выполнять takeUntil(wlf) и mergeWith(wlf), разве вы не можете заменить их на switchLatest? что-то вроде: Observable.of(cl.take(1), wlf).switchLatest()? - person Scott H; 04.02.2017
comment
@ScottH, я думаю, это тоже сработает, но я не уверен, есть ли у cl возможность завершить поток wlf. По крайней мере, с takeUntil мы знаем, что закончится только cl. Если переключатель работает, я согласен, он будет выглядеть чище; просто не забудьте добавить модульные тесты !!! - person flakes; 04.02.2017
comment
Мне пришлось сделать o1 и o2 подключаемыми (publish (). RefCount ()), чтобы он работал должным образом, из-за того, что, как я подозреваю, было причиной дублирования выбросов с холодными наблюдаемыми объектами. - person Chris6647; 12.07.2018

Это некрасиво, но это работает (на C #):

var xs = new Subject<string>();
var ys = new Subject<int>();

var query =
    Observable
        .Merge(
            xs.Select(x => new { xt = true, yt = false, x, y = default(int) }),
            ys.Select(y => new { xt = false, yt = true, x = default(string), y }))
        .StartWith(new { xt = false, yt = false, x = default(string), y = default(int) })
        .Scan((a, b) => new
        {
            xt = a.xt && a.yt ? false : a.xt || b.xt,
            yt = a.xt && a.yt ? false : a.yt || b.yt,
            x = b.xt ? b.x : a.x,
            y = b.yt ? b.y : a.y
        })
        .Where(z => z.xt & z.yt)
        .Select(z => z.y + z.x);

query.Subscribe(v => Console.WriteLine(v)); 

ys.OnNext(1);
ys.OnNext(2);
xs.OnNext("A");
xs.OnNext("B");
xs.OnNext("C");
xs.OnNext("D");
ys.OnNext(3);
xs.OnNext("E");

Это дает:

2A
3D
person Enigmativity    schedule 03.02.2017
comment
Я думаю, вы, возможно, немного перестроили это :-) - person flakes; 04.02.2017
comment
@flakes -Возможно. - person Enigmativity; 04.02.2017
comment
Спасибо @Enigmativity! Мне больше нравится ответ @flakes, потому что это довольно простая композиция основных операторов Rx, но я ценю усилия! - person Scott H; 04.02.2017