Наблюдаемые операторы, которые делают что-то по прошествии некоторого времени

В цепочке наблюдаемых rxjs, как я могу что-то сделать с доступом к текущему значению наблюдаемого по прошествии заданного времени? По сути, я ищу что-то вроде оператора касания, но он выполняется, только если установленное количество времени прошло без просмотра значения из наблюдаемого. Так что на самом деле это похоже на комбинацию нажатия и тайм-аута.

Я представляю себе что-то вроде следующего

observable$.pipe(
  first(x => x > 5),
  tapAfterTime(2000, x => console.log(x)),
  map(x => x + 1)
).subscribe(...);

Это выдуманный пример, а функция «tapAfterTime» нереальна. Но основная идея заключается в том, что если после подписки прошло 2000 мс, а наблюдаемый объект не увидел значение больше 5, тогда выполните функцию обратного вызова tapAfterTime для любого текущего значения наблюдаемого. Если мы увидим значение больше 5 до 2000 мс, то обратный вызов tapAfterTime никогда не будет запущен, но функция карты всегда будет работать, как ожидалось.

Есть ли оператор для этого или любая комбинация операторов?


person Rich McCluskey    schedule 27.09.2018    source источник
comment
Должен ли он запускаться каждые 2000 мс интервалов тишины или только один раз после предыдущего x ›5 выданного значения?   -  person Alexander Poshtaruk    schedule 27.09.2018
comment
О, поскольку он содержит "первый", он должен запускаться, как только я предполагаю   -  person Alexander Poshtaruk    schedule 27.09.2018
comment
debounceTime learnrxjs.io/operators/filtering/debouncetime.html или debounce learnrxjs.io/operators/filtering/debounce.html?   -  person Eliseo    schedule 27.09.2018


Ответы (3)


Может быть, это действительно слишком сложно, может, стоит поискать.

Идея состоит в том, чтобы иметь 2 разных наблюдаемых, созданные преобразованием источника observable$, а затем в конечном итоге объединенные.

Первый Observable, назовем его obsFilterAndMapped, - это тот, где вы выполняете фильтрацию и отображение.

Второй Observable, назовем его obsTapDelay, - это Observable, который запускает новый таймер с определенной задержкой каждый раз, когда излучает первый Observable, то есть obsFilterAndMapped - как только delayTime передано, то вы выполняете свое tapAfterTime действие - если первый Observable испускает новое значение до того, как будет передано delayTime, тогда будет создан новый таймер.

Это код, который реализует эту идею

const stop = new Subject<any>();
const obsShared = observable$.pipe(
    finalize(() => {
        console.log('STOP');
        stop.next();
        stop.complete()
    }),
    share()
);
const delayTime = 300;
const tapAfterTime = (value) => {
    console.log('tap with delay', value)
}; 

let valueEmitted;

const obsFilterAndMapped = obsShared.pipe(
    tap(val => valueEmitted = val),
    filter(i => i > 7),
    map(val => val + ' mapped')
);

const startTimer = merge(of('START'), obsFilterAndMapped);

const obsTapDelay = startTimer.pipe(
    switchMap(val => timer(delayTime).pipe(
        tap(() => tapAfterTime(valueEmitted)),
        switchMap(() => empty()),
    )),
    takeUntil(stop),
)

merge(obsFilterAndMapped, obsTapDelay)
.subscribe(console.log, null, () => console.log('completed'))

При таком подходе вы будете выполнять свое tapAfterTime действие каждый раз, когда источник observable$ ничего не испускает в течение времени, превышающего delayTime. Другими словами, это сработает не только для первого выброса observable$, но и для всей его жизни.

Вы можете протестировать такой код с помощью следующего ввода

const obs1 = interval(100).pipe(
    take(10),
);
const obs2 = timer(2000, 100).pipe(
    take(10),
    map(val => val + 200),
);
const observable$ = merge(obs1, obs2);

Приложив еще немного усилий, мы можем даже подумать о том, чтобы скрыть глобальную переменную valueEmitted внутри замыкания, но это увеличило бы сложность кода и, вероятно, этого не стоит.

person Picci    schedule 28.09.2018

Взгляни на

А возможно что-то в этом роде?

--

initiateTimer() {
    if (this.timerSub) {
        this.timerSub.unsubscribe();
    }

    this.timerSub = Rx.Observable.timer(2000)
        .take(1)
        .subscribe(this.showPopup.bind(this));
}
person Ryan    schedule 27.09.2018

Это может быть примерно так:

let cancel;
observable$.pipe(
  tap((x)=>clear=setTimeout(()=>console.log(x), 2000)),
   filter(x => x > 5),
   tap(x => clearTimeout(clear)),
   map(x => x + 1)
);
person Alexander Poshtaruk    schedule 27.09.2018
comment
Подробнее об операторах в RxJS в моем видеокурсе: packtpub.com / web-development / hands-rxjs-web-development-video - person Alexander Poshtaruk; 24.04.2019