Как автоматически подписаться на внутренний Flux/Mono?

У меня есть поток (ограниченный) Flux, который я хочу преобразовать в поток Long, где Long — это размер внутреннего потока:

   Flux.just( Flux.just(1, 2, 3),  Flux.just(1, 2)  )
       .map(Flux::count)
       .log()
       .subscribe();

Журнал выполнения следующий:

onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
request(unbounded)
onNext({ "operator" : "Count" })
onNext({ "operator" : "Count" })
onComplete()

Flux::count возвращает Mono, а не Long. Есть ли операторы для автоматической распаковки этого внутреннего моно при подписке на основной поток?


person Nicolas Barbé    schedule 04.09.2017    source источник


Ответы (1)


flatMap() к вашим услугам:

Преобразуйте элементы, испускаемые этим Flux асинхронно, в Publisher, затем объедините эти внутренние издатели в один Flux посредством слияния, что позволит им чередоваться.

https://projectreactor.io/docs/core/snapshot/api/reactor/core/publisher/Flux.html#flatMap

person Artem Bilan    schedule 04.09.2017
comment
Спасибо за ответ ! проблема, которую я пытаюсь решить, на самом деле немного отличается: какой оператор я должен использовать, если внутренний моно не находится непосредственно в родительском потоке: от Flux<Tuple<String, Mono<Long>>> до Flux<Tuple<String, Long>> Можем ли мы все еще использовать плоскую карту в этом случае? - person Nicolas Barbé; 05.09.2017
comment
Хм. Нет, я думаю, что это не сработает. То есть ралли вложенное значение Tuple и я как-то не уверен, что есть чем обрабатывать его часть по требованию. - person Artem Bilan; 05.09.2017
comment
Наконец-то нашел решение без подписки на внутренний поток/моно. После groupBy я связываю следующий оператор и создаю кортеж после подсчета элементов группы: .flatMap( group -> group.count().map( count -> Tuples.of( group.key(), count ))); - person Nicolas Barbé; 06.09.2017
comment
Большой! Код в студию пожалуйста! - person Artem Bilan; 06.09.2017