Как изменить наблюдаемый поток другим наблюдаемым в RXJS? Угловой

Я хочу изменить вложенный флаг в моем ships$, когда изменится флаг areShipsExpanded$. Как я могу это сделать. Вот что у меня есть на данный момент:

areShipsExpanded$: Observable<boolean>;
ships$: Observable<Ship>;

constructor() {
   this.ships$ = this.shipsDataSource.getData().pipe(shareReplay());
}

ngOnInit(): void {
    this.areShipsExpanded$.pipe(
        map(flag => {
            if (flag) {
                this.ships$.pipe(map(s => s.items.forEach(r => r.isExpanded = flag)));
            }
        })
    ).subscribe();
}

К сожалению, похоже, что sizes $ не изменен, или, может быть, мне нужно как-то обновить эту ships$ наблюдаемую, поскольку я использую ее в своем представлении таким образом: <ng-container *ngFor="let ship of (ships$ | async)?.items">


@РЕДАКТИРОВАТЬ

Я так поступил, это правильно или это плохая практика?

this.areShipsExpanded$.pipe(
    map(flag => {
        if (flag) {
            this.ships$ = this.ships$.pipe(map(s => {
                s.items.forEach(r => r.isExpanded = flag);
                return s;
            }));
        }
    })
).subscribe();

@ EDIT2

Я использовал combineLatest, теперь все должно быть в порядке?

shipsData$: Observable<Ship>;
ships$: Observable<Ship>;

constructor() {
   this.shipsData$ = this.shipsDataSource.getData().pipe(shareReplay());
}

this.ships$ = combineLatest(this.areShipsExpanded$, this.shipsData$).pipe
    map(([shipsExpanded, ships]) => {
        ships.items.forEach(r => r.isExpanded = shipsExpanded);
        return ships;
    }),
    share()
);

Но проблема combineLatest в том, что вначале функции будут запускаться 2 раза, поскольку оба наблюдаемых объекта испускают значения. Есть ли какой-либо альтернативный оператор, который ведет себя как combineLatest - поэтому, когда какой-либо наблюдаемый выдает значение, испускать последнее значение из каждого, но ТОЛЬКО КОГДА все наблюдаемые будут готовы. Чтобы не повторяться в начале.


person DiPix    schedule 27.07.2018    source источник
comment
В расширении вы хотели бы обновить свои корабли? Ваша модель, похоже, не справляется с этим. У вас должен быть один Observable и список кораблей. Здесь у вас есть две наблюдаемые. Это означает, что вы можете обновлять свои корабли только при изменении вашего списка кораблей.   -  person    schedule 27.07.2018
comment
Найдите объединение потоков в RxJS, вы можете объединить areShipsExpanded и ships и прослушать изменения в обоих, а затем сопоставить результаты для создания окончательного ship$. Какой комбинированный оператор использовать, зависит от ваших требований, ищите combineLatest, zip и т. Д.   -  person sabithpocker    schedule 27.07.2018
comment
@sabithpocker проверьте ИЗМЕНИТЬ, пожалуйста   -  person DiPix    schedule 27.07.2018
comment
имеет ли areShipsExpanded начальное значение?   -  person Fan Cheung    schedule 27.07.2018
comment
Что вы сделали при редактировании, так это то, что вы переопределяете свойство this.ships$, но подписчики на исходный this.ships$ останутся подписанными на исходный this.ships$, а не на новый.   -  person martin    schedule 27.07.2018
comment
@martin так почему это работает? : D   -  person DiPix    schedule 27.07.2018
comment
В любом случае проверьте мой следующий РЕДАКТИРОВАТЬ   -  person DiPix    schedule 28.07.2018
comment
возможно, this.areShipsExpanded $ .pipe (withLatestFrom (this.shipsData $), map (// ...), share ()) cloud поможет вам (поток будет запускаться только тогда, когда this.areShipsExpanded $ испускает и если this.shipsData $ передал значение)   -  person OXMO456    schedule 28.07.2018
comment
@ OXMO456 Но как я могу выдать this.areShipsExpanded$ первое значение при загрузке этого компонента? Потому что теперь значения выдаются кнопкой.   -  person DiPix    schedule 29.07.2018
comment
@DiPix посмотри на мой ответ, может быть, это тебе поможет   -  person OXMO456    schedule 31.07.2018


Ответы (1)


Может это может вам помочь:

const fetchData$ = this.shipsDataSource.getData().pipe(shareReplay());

this.ships$ = merge(
    this.areShipsExpanded$.pipe(
        withLatestFrom(fetchData$),
        map(([shipsExpanded, ships]) => 
            // we could try to favor immutability
            ships.items.map(r => {...r, isExpanded:shipsExpanded})), 
    ), // ships after update
    fetchData$ // initial ships
).pipe(share());
person OXMO456    schedule 31.07.2018