Преобразование бесконечного потока конечных потоков в бесконечный поток — Reactive X

Как этого добиться в Reactive x (в идеале с примерами в RxJava или RxJs)?

a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2                       |-x-x-x-x-x-| (subscribe)
s2                                               |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)

a — это бесконечный поток событий, который запускает конечный поток sn событий, каждое из которых должно быть частью бесконечного потока S, при этом имея возможность подписаться на каждый sn поток (для выполнения операций суммирования), но в то же время сохраняя поток S как бесконечный.

РЕДАКТИРОВАТЬ: Чтобы быть более конкретным, я предоставляю реализацию того, что я ищу в Котлине. Каждые 10 секунд генерируется событие, которое сопоставляется с общим конечным потоком из 4 событий. Метапоток преобразуется flatMap в обычный бесконечный поток. Я использую doAfterNext для дополнительной подписки на каждый конечный поток и распечатки результатов.

/** Creates a finite stream with events
 * $ch-1 - $ch-4
 */
fun createFinite(ch: Char): Observable<String> =
        Observable.interval(1, TimeUnit.SECONDS)
                .take(4)
                .map({ "$ch-$it" }).share()

fun main(args: Array<String>) {

    var ch = 'A'

    Observable.interval(10, TimeUnit.SECONDS).startWith(0)
            .map { createFinite(ch++) }
            .doAfterNext {
                it
                        .count()
                        .subscribe({ c -> println("I am done. Total event count is $c") })
            }
            .flatMap { it }
            .subscribe { println("Just received [$it] from the infinite stream ") }

    // Let main thread wait forever
    CountDownLatch(1).await()
}

Однако я не уверен, что это «чистый RX».


person ps-aux    schedule 02.09.2017    source источник
comment
Это похоже на concatMap, но из вопроса неясно, как вы сопоставите каждое событие с набором N внутренних источников.   -  person akarnokd    schedule 02.09.2017
comment
Возможно, добавьте пример того, что вы пробовали до сих пор, это даст нам лучшее представление о том, чего вы пытаетесь достичь.   -  person paulpdaniels    schedule 03.09.2017
comment
i0.kym-cdn.com/photos/ images/original/000/173/576/Wat8.jpg — Я читаю заголовок   -  person Stephan Dollberg    schedule 03.09.2017
comment
@inf Я знаю, что это не идеально, так как я просто промокну ноги с rx. Вы можете редактировать его.   -  person ps-aux    schedule 03.09.2017
comment
@dev-null На самом деле это была шутка, я понятия не имею, что такое rx.   -  person Stephan Dollberg    schedule 03.09.2017


Ответы (1)


Вы не ясно даете понять, как вы хотите сделать подсчет. Если вы делаете общий подсчет, то внутреннюю подписку делать не нужно:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it }
        .doOnNext( counter.incrementAndget() )
        .subscribe { println("Just received [$it] from the infinite stream ") }

С другой стороны, если вам нужно предоставить счетчик для каждого промежуточного наблюдаемого, вы можете переместить счет внутрь flatMap() и распечатать счетчик и сбросить его по завершении:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it
                     .doOnNext( counter.incrementAndget()
                     .doOnCompleted( { long ctr = counter.getAndSet(0)
                                        println("I am done. Total event count is $ctr")
                                     } )
        .subscribe { println("Just received [$it] from the infinite stream ") }

Это не очень функционально, но такой вид отчетов имеет тенденцию нарушать нормальные потоки.

person Bob Dalgleish    schedule 04.09.2017