Как этого добиться в Reactive x (в идеале с примерами в RxJava или RxJs)?
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)
a
— это бесконечный поток событий, который запускает конечный поток sn
событий, каждое из которых должно быть частью бесконечного потока S
, при этом имея возможность подписаться на каждый sn
поток (для выполнения операций суммирования), но в то же время сохраняя поток S
как бесконечный.
РЕДАКТИРОВАТЬ: Чтобы быть более конкретным, я предоставляю реализацию того, что я ищу в Котлине. Каждые 10 секунд генерируется событие, которое сопоставляется с общим конечным потоком из 4 событий. Метапоток преобразуется flatMap
в обычный бесконечный поток. Я использую doAfterNext
для дополнительной подписки на каждый конечный поток и распечатки результатов.
/** Creates a finite stream with events
* $ch-1 - $ch-4
*/
fun createFinite(ch: Char): Observable<String> =
Observable.interval(1, TimeUnit.SECONDS)
.take(4)
.map({ "$ch-$it" }).share()
fun main(args: Array<String>) {
var ch = 'A'
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.doAfterNext {
it
.count()
.subscribe({ c -> println("I am done. Total event count is $c") })
}
.flatMap { it }
.subscribe { println("Just received [$it] from the infinite stream ") }
// Let main thread wait forever
CountDownLatch(1).await()
}
Однако я не уверен, что это «чистый RX».
concatMap
, но из вопроса неясно, как вы сопоставите каждое событие с набором N внутренних источников. - person akarnokd   schedule 02.09.2017rx
. - person Stephan Dollberg   schedule 03.09.2017