Оператор потока массивов в массив потоков

В Rx.js, как превратить поток массивов в массив потоков, например, у меня есть следующий поток: ['0a','0b'], ['1a','1b'],['2a', '2b','2c'], и я хочу получить следующие потоки:

0a---1a---2a--->
0b---1b---2b--->
          2c--->

Существуют ли какие-либо операторы для выполнения чего-то подобного или мне нужно написать их с нуля?


person Ilyas Malgazhdarov    schedule 12.12.2015    source источник


Ответы (2)


Что-то вроде этого должно работать

stream.
  flatMap(array =>
    Rx.Observable.from(
      array.map((obj, i) => {index: i, ...obj})
    )
  ).groupBy(x => x.index, ).
  subscribe(x =>
    x.map((x,i) => subscribe(x))
  )
person almeynman    schedule 12.12.2015

Вы можете добиться этого относительно легко с помощью существующих операторов.

То, чего вы хотите достичь, очень похоже на то, что описано здесь: поочередно объединять элементы потоков

Он предлагает два пути:

  1. используя оператор Rx.Observable.zip (принимает в качестве аргумента массив наблюдаемых и выдает поток массивов, элемент которого с индексом n является значением x, испускаемым n-м наблюдаемым)

    Однако это решение, примененное в вашем примере, остановится на 1a,1b, потому что результирующая наблюдаемая будет завершена, как только завершится одна из наблюдаемых.

  2. расширение ваших массивов, чтобы придать им одинаковую длину, путем заполнения фиктивными значениями и применения оператора Rx.Observable.zip

В этих обоих вариантах:

  • если вы удалите последнюю строку, .concatMap.... вы получите поток массива, такой как [0a,0b], [1a,1b], [2a,2b,2c], из которого вы можете легко отобразить по индексу (.map(function(array){return array[N];}) даст вам [Na,Nb...]), чтобы получить поток, который вы хотите.
  • ИЛИ вы можете оставить точно такой же код и добавить .filter(function(value,index){return index % N == I}), где N — количество потоков, а I — нужный вам поток, т. е. поток со значениями (Ia,Ib...)

Документация по оператору zip > http://reactivex.io/documentation/operators/zip.html https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md,

person user3743222    schedule 12.12.2015