Объединение нескольких потоков

Я переношу свой проект с Spring на Ktor и решил заменить реализацию реактивных потоков, которая изначально была Reactor, на RxJava 2. Хотя я столкнулся с некоторой проблемой при попытке объединить несколько потоков в один в конце реактивного конвейера. Вот как это выглядит:

internal interface Aggregator {
    fun acquireSomethingFromSomewhere(keyword: String): Flowable<Some>
}

fun acquireSomething(keyword: String) = Flowable
    .fromIterable(aggregators)
    .map { it.acquireSomethingFromSomewhere(keyword) }
    .flatMap { ??? }

Дело в том, что каждый вызов acquireSomethingFromSomewhere возвращает Flowable<Some>, есть ли какой-нибудь оператор, который мог бы помочь мне объединить их в один поток в конце? В Reactor я только что использовал:

fun acquireSomething(keyword: String) = Flux
    .fromIterable(aggregators)
    .map { it.acquireSomethingFromSomewhere(keyword) }
    .flatMap { Flux.concat(it) }

Но в RxJava я не могу найти ни одного оператора, который мог бы решить мою проблему, так как каждый из них принимает Publisher в качестве аргумента, а Flowable его не реализует.


person sh1nen    schedule 26.01.2019    source источник
comment
Подходит ли вам .flatMap { it.acquireSomethingFromSomewhere(keyword) }?   -  person Caramiriel    schedule 26.01.2019


Ответы (1)


Прежде всего, если функция, которую вы предоставляете в map, возвращает Flowable, вы получите вложенные Flowables (также известные как Flowable<Flowable<T>>), что, вероятно, не то, что вам нужно. Это связано с тем, что функция map преобразует только элемент внутри контейнера (T) -> R (в данном случае это контейнер Flowable). В вашем случае вы хотите преобразовать элемент внутри первого контейнера, возвращая новый контейнер (T) -> Flowable<R>, эта функция называется flatMap. В случае Rx у вас больше функций (операторов) в зависимости от их поведения, например concatMap и switchMap , но подписи одинаковые.

Пример

fun acquireSomething(keyword: String) = Flowable
    .fromIterable(aggregators)
    .flatMap { it.acquireSomethingFromSomewhere(keyword) }

ПС

Если вы хотите узнать больше о лежащей в основе теории, вы можете следовать Arrow-kt документации Functor и Monad

person Omar Mainegra    schedule 27.01.2019