RXJS: агрегированный дребезг

Мой пример использования следующий: я получаю события, которые иногда происходят всплесками. Если происходит всплеск, мне нужно обработать его только один раз. Debounce делает это.

Однако debounce дает мне только последний элемент пакета, но мне нужно знать обо всех элементах пакета, чтобы агрегировать их (с использованием плоской карты).

Это может быть выполнено с помощью временного окна или буфера, однако это фиксированные интервалы, поэтому тайм-аут буфера / окна может произойти в середине пакета, поэтому пакет разбивается на 2 части для обработки вместо 1.

Так что я бы хотел что-то вроде

.
.
event: a
.
. -> a
.
.
.
.
.
.event: b
.event: c
.event: d
.
.-> b,c,d
. 
.
.
.
.event : e
.
. -> e
.

person Seba Kerckhof    schedule 01.03.2016    source источник
comment
удалось ли решить вашу проблему?   -  person user3743222    schedule 12.03.2016
comment
@ user3743222 Мне подходит ответ Мэтта Бёрнелла.   -  person Seba Kerckhof    schedule 14.03.2016


Ответы (2)


Этого можно добиться с помощью buffer передавая отклоненный поток в качестве закрывающего селектора, например:

var s = Rx.Observable.of('a')
  .merge(Rx.Observable.of('b').delay(100))
  .merge(Rx.Observable.of('c').delay(150))
  .merge(Rx.Observable.of('d').delay(200))
  .merge(Rx.Observable.of('e').delay(300))
  .share()
;

s.buffer(s.debounce(75)).subscribe(x => console.log(x));

Вот исполняемая версия: https://jsbin.com/wusasi/edit?js,console,output

person Matt Burnell    schedule 02.03.2016
comment
Мне пришлось внести некоторые изменения, чтобы это работало в RxJS 5: jsbin .com / pubowum / 2 / edit? html, js, console, output - person Oliver Joseph Ash; 25.05.2017
comment
По крайней мере, для rxJava, это повторяет один и тот же список каждые 75 мс. - person Code Wiget; 22.09.2020