Я пытаюсь связать вызовы с сервером с максимальным значением maxEntries, но не хочу ждать дольше, чем maxWait ms. Раньше это было доступно как windowWithTimeOrCount()
в RxJS 4, но было удалено из RxJS 5.
Все отлично работает, за исключением того, что последний элемент окна теряется. И говоря о «потерянном» — вот что я чувствую сейчас. Кто-нибудь из гуру RxJS может сказать мне, что я делаю неправильно?
private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {
// We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
// but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
// complete after maxEntries / maxWait (whatever comes first).
const toggleSubject = new Subject<void>();
return queue
// Start emitting a new Observable every time toggleSubject emits.
// (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
// complete collection)
.windowWhen(() => toggleSubject)
// map() is called once for every window (maxEntries/maxWait)
// the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
// count all elements, then emitting on toggleSubject, triggering a new Observable.
// (We have to map() here - instead of an outer do() - because otherwise the original obs would be streamed
// and the hooked up version with the inner do() would never be called.)
.map((obs) => {
// counts the number of cacheEntries already in this stream
let count = 0;
// flag to kill the timeout callback
let done = false;
// we have to return an Observable
return obs.do(() => {
count++;
if (count === 1) {
// we start counting when the first element is streamed.
IntervalObservable.create(maxWait).first().subscribe(() => {
if (!done) {
//trigger due to maxWait
toggleSubject.next(null);
}
});
}
if (count > (maxEntries)) {
done = true;
// trigger due due to maxEntries(' + maxEntries + ')');
toggleSubject.next(null);
}
}
);
});
}
Элемент, вызывающий toggleSubject.next(null)
из-за if (count > (maxEntries))
, потерян (ни в одном окне).
РЕДАКТИРОВАТЬ: maxTime начинает отсчитывать момент, когда первый элемент нового Observable нажимается. if (count === 1 )
. Это а) причина, по которой я работаю изнутри оконных Observables в map()
и б) важно, потому что это требуемое поведение.
Пример: maxElements: 100, maxWait: 100. 101 элемент помещается в t=99. Ожидаемое поведение: при t=99 вталкивается Observable со 100 элементами. Остался 1 элемент. Счетчик + сброс таймера. В t=199 счетчик для второго «куска» истекает и выдвигает Observable с 1 элементом.
(В этом примере код Брэндона (см. ответ) будет - если я правильно его прочитал - поместит Observable в t=99 со 100 элементами и через одну мс, в t=100, Observable с одним элементом .)