RxJava: Окно по количеству или промежутку времени без отбрасывания элементов

Я хочу разделить свой поток на пакеты определенного максимального размера, и если этот размер не будет достигнут через некоторое время, закрыть пакет и начать новый. Для этого я попытался использовать окно (количество):

things.window(10)

Однако это ждет, пока не будет получено 10 элементов, чтобы создать новое окно Observable. Если я использую оператор window(timespan, unit, count):

things.window(1, TimeUnit.SECONDS, 10)

Я потеряю все элементы, которые появились после 10-го числа и до завершения временного промежутка.

Мне нужен аналогичный оператор, который вместо ожидания завершения временного промежутка выдает новое окно при достижении счетчика.

things.windowXXX(timespan = 1s, count = 2) : Observable[T]

things:   ----o--o--o-----------o----o------->
timespan: [      )[    1s    ][      )[    -->
window 1: -----o-o-|
window 2:          -o---------|
window 3:                     --o-----o-| 

person jcfandino    schedule 06.08.2014    source источник


Ответы (1)


Я нашел способ обойти эту проблему, используя buffer(timespan, timeunit, count) вместо window. Я думал, что буфер и окно имеют одинаковое поведение, только одно испускает списки, а другие наблюдаемые. Но, кажется, здесь есть разница.

Моим решением было использовать буфер, а затем сопоставлять каждый результат с наблюдаемым:

observable.buffer(10, TimeUnit.MILLISECONDS, 2)
          .map(new Func1<List<String>, Observable<String>>() {
                public Observable<String> call(List<String> l) {
                    return Observable.from(l);
                }
           })

Я сделал несколько тестов, чтобы увидеть разницу: https://gist.github.com/jcfandino/fd47277ada821f51a9d4

    observable.map(delayOn4).window(10, TimeUnit.MILLISECONDS, 2)
            .subscribe(new UpdateCountdowns("window"));
    assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
    assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));

Отпечатки:

окно - 1968827463: 1
окно - 1968827463: 2
окно - 1968827463: 3
окно - 1267747034: 4
окно - 1267747034: 5

Обратите внимание, что он испускает две партии вместо трех, а первая содержит три элемента вместо двух.

С другой стороны, с помощью buffer:

    observable.map(delayOn4).buffer(10, TimeUnit.MILLISECONDS, 2)
            .map(new Func1<List<String>, Observable<String>>() {
                public Observable<String> call(List<String> l) {
                    return Observable.from(l);
                }
            }).subscribe(new UpdateCountdowns("buffer"));
    assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
    assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));

Отпечатки:

буфер - 1257525795: 1
буфер - 1257525795: 2
буфер - 1849466438: 3
буфер - 104886386: 4
буфер - 104886386: 5

Три партии максимум с двумя элементами, так как "4" пришла поздно, она закончилась в следующей партии.

Я не знаю, является ли это ожидаемым поведением или там может быть ошибка.

person jcfandino    schedule 07.08.2014