RxJS — использование windowWhen() для окон maxWait и maxElements

Я пытаюсь связать вызовы с сервером с максимальным значением maxEntries, но не хочу ждать дольше, чем maxWait ms. Раньше это было доступно как windowWithTimeOrCount() в RxJS 4, но было удалено из RxJS 5.

Все отлично работает, за исключением того, что последний элемент окна теряется. И говоря о «потерянном» — вот что я чувствую сейчас. Кто-нибудь из гуру RxJS может сказать мне, что я делаю неправильно?

 private chunk(queue: Observable<CacheEntry>, maxEntries: number, maxWait: number): Observable<Observable<CacheEntry>> {

    // We have an incoming stream of CacheEntries to be retrieved. We want to bundle these in units of max maxEntries
    // but wait no longer than max maxWait ms. We return an Observable, that emits Observables of CacheEntries that
    // complete after maxEntries / maxWait (whatever comes first).
    const toggleSubject = new Subject<void>();

    return queue

    // Start emitting a new Observable every time toggleSubject emits.
    // (bufferWhen() wouldn't work as we have to count the elements as they come and buffer only gives us the
    // complete collection)
      .windowWhen(() => toggleSubject)

      // map() is called once for every window (maxEntries/maxWait)
      // the inner do() is called for every element in the window, allowing us to set up the timeout callback and to
      // count all elements, then emitting on toggleSubject, triggering a new Observable.
      // (We have to map() here - instead of an outer do() -  because otherwise the original obs would be streamed
      // and the hooked up version with the inner do() would never be called.)
      .map((obs) => {
        // counts the number of cacheEntries already in this stream
        let count = 0;
        // flag to kill the timeout callback
        let done = false;
        // we have to return an Observable
        return obs.do(() => {
            count++;
            if (count === 1) {
              // we start counting when the first element is streamed.
              IntervalObservable.create(maxWait).first().subscribe(() => {
                if (!done) {
                  //trigger due to maxWait
                  toggleSubject.next(null);
                }
              });
            }
            if (count > (maxEntries)) {
              done = true;
              // trigger due due to maxEntries(' + maxEntries + ')');
              toggleSubject.next(null);
            }
          }
        );
      });
  }

Элемент, вызывающий toggleSubject.next(null) из-за if (count > (maxEntries)), потерян (ни в одном окне).

РЕДАКТИРОВАТЬ: maxTime начинает отсчитывать момент, когда первый элемент нового Observable нажимается. if (count === 1 ). Это а) причина, по которой я работаю изнутри оконных Observables в map() и б) важно, потому что это требуемое поведение.

Пример: maxElements: 100, maxWait: 100. 101 элемент помещается в t=99. Ожидаемое поведение: при t=99 вталкивается Observable со 100 элементами. Остался 1 элемент. Счетчик + сброс таймера. В t=199 счетчик для второго «куска» истекает и выдвигает Observable с 1 элементом.

(В этом примере код Брэндона (см. ответ) будет - если я правильно его прочитал - поместит Observable в t=99 со 100 элементами и через одну мс, в t=100, Observable с одним элементом .)


person RAlfoeldi    schedule 29.08.2016    source источник


Ответы (2)


Да, вы не хотите использовать map для таких побочных эффектов. Как вы заметили, вы в конечном итоге роняете предметы.

Вот общий метод, который, я думаю, будет делать то, что вы хотите.

Примечание. В RXJS 5 в настоящее время есть проблема с определением типа для этой перегрузки публикации. Я добавил несколько типов, которые должны позволить ему компилироваться в TypeScript.

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
    // use publish() so that we can subscribe multiple times to the same stream of data.
    return queue.publish(entries => {
        // observable which will trigger after maxWait
        const timer = IntervalObservable.create(maxWait);
        // observable which will trigger after maxEntries
        const limit = entries.take(maxEntries).last();
        // observable which will trigger on either condition
        const endOfWindow = limit.takeUntil(timer);

        // use endOfWindow to close each window.
        return entries.windowWhen(() => endOfWindow) as Observable<T>;
    }) as Observable<Observable<T>>;
}

Редактировать:

Если вы не хотите, чтобы таймер запускался до тех пор, пока в каждом окне не появится первый элемент, вы можете сделать это следующим образом:

chunk<T>(queue: Observable<T>, maxEntries: number, maxWait: number): Observable<Observable<T>> {
    // use publish() so that we can subscribe multiple times to the same stream of data.
    return queue.publish(entries => {
        // observable which will trigger after maxWait after the first
        // item in this window arrives:
        const timer = entries.take(1).delay(maxWait);
        // observable which will trigger after maxEntries
        const limit = entries.take(maxEntries).last();
        // observable which will trigger on either condition
        const endOfWindow = limit.takeUntil(timer);

        // use endOfWindow to close each window.
        return entries.windowWhen(() => endOfWindow) as Observable<T>;
    }) as Observable<Observable<T>>;
}
person Brandon    schedule 29.08.2016
comment
Прохладный. Спасибо. Выглядит хорошо (и я сам глупый :-()) Я попробую это завтра. Мой подход был немного сложнее... - person RAlfoeldi; 29.08.2016
comment
Хм... выглядит хорошо, но не работает. publish()returns Observable‹T›, ожидаемое значение Observable‹Observable‹T››. Как я прочитал ваш код, таймер срабатывает независимо (каждый maxWait мс независимо от того, что происходит в потоке). Вот почему я пытаюсь получить обратную связь изнутри Observable 'windowWhen()' - я начинаю считать, когда появляются первые элементы. (См. оп ред.) - person RAlfoeldi; 30.08.2016
comment
Эта перегрузка публикации — publish(func: (Observable<T>) => Observable<U>): Observable<U>. Другими словами, publish() возвращает любой наблюдаемый тип, возвращаемый фабричным методом. В этом случае я возвращаю Observable‹Observable‹T››. - person Brandon; 30.08.2016
comment
Вы уверены, что? Из файла publish.d.ts RxJS: export declare function publish<T>(selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T>; - person RAlfoeldi; 30.08.2016
comment
Ах, они изменили сигнатуру в RxJS5, но не фактическую функциональность. Вот подпись от RXJS4: publish<TResult>(selector: (source: ConnectableObservable<T>) => Observable<TResult>): Observable<TResult>;. Я думаю, если вы его разыграете, это сработает, но я открою GH Issue, чтобы проверить. - person Brandon; 30.08.2016
comment
на самом деле проблема была открыта на прошлой неделе: github.com/ReactiveX/rxjs/issues/1905 - person Brandon; 30.08.2016
comment
Ваше решение выглядит намного лучше, чем мое, и я действительно хотел бы его использовать, и да, ваш комментарий о сложности написания подобных вещей совершенно верен. Но я даже не очень понял, как ваше решение должно работать. entries — это непрерывный, бесконечный поток элементов. Когда, где и как сбрасываются триггеры (timer, limit)? Как я прочитал код, это одноразовое решение. - person RAlfoeldi; 30.08.2016
comment
windowWhen вызывает фабричный метод для каждого нового окна, поэтому каждый раз, когда окно инициируется для закрытия, фабричный метод вызывается для создания нового триггера. Новый триггер запускает новый счетчик и новый таймер, которые заново начинают отслеживать непрерывный поток entries в текущем месте потока. Ключевой трюк здесь заключается в том, что publish дает нам многоадресную версию очереди, которую мы можем повторно использовать. Посмотрите, поможет ли это: xgrommx.github.io/rx-book /контент/наблюдаемый/ - person Brandon; 30.08.2016
comment
Сократ: «Я знаю, что ничего не знаю» :-) Спасибо за терпение. Я попробую, как только RxJS отправит новые подписи как npm. - person RAlfoeldi; 30.08.2016
comment
Я добавил в пример несколько типовых преобразований, которые должны позволить ему компилироваться. - person Brandon; 30.08.2016

Решение, которое я придумал, это переключение windowWhen() в асинхронном планировщике.

if (count === (maxEntries)) {
  done = true;
  this.LOGGER.debug(' - trigger due due to maxEntries(' + maxEntries + ')');
  Rx.Scheduler.async.schedule(()=>toggleSubject.next(null));
}

Проблема заключалась в том, что windowWhen() немедленно завершал возвращенные Observables, предотвращая получение этого последнего значения любыми нижестоящими операторами.

Извините, что задаю (и отвечаю) вопрос. Я пробовал Rx.Scheduler.async и т. д., прежде чем публиковать здесь, но почему-то это не сработало.

person RAlfoeldi    schedule 30.08.2016
comment
Обратите внимание, что в ваших окнах может оказаться более maxEntries записей, если элементы помещаются в наблюдаемое синхронно, поскольку все они будут протолкнуты через текущее окно до того, как ваш асинхронный вызов будет выполнен. Сложности, связанные с написанием подобных операций, являются одной из причин, по которой общий совет состоит в том, чтобы не использовать субъекты, а вместо этого создавать свой оператор из существующих операторов. - person Brandon; 30.08.2016