ForkJoin 2 BehaviorSubjects

У меня есть два потока темы поведения, которые я безуспешно пытаюсь разветвить. Как я и предполагал, он возвращает два последних значения. Возможно ли это как-то реализовать?

Он не называется после подлежащего.

let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');

Observable.forkJoin(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });

person Lajos    schedule 27.09.2016    source источник


Ответы (3)


Обратите внимание, что на самом деле делает forkJoin() из документации:

Подождите, пока Observable завершится, а затем объедините последние значения, которые они испустили.

Это означает, что forkJoin() выдает значение, когда все входные наблюдаемые завершены. При использовании BehaviorSubject это означает явный вызов complete() для них обоих:

import { Observable, BehaviorSubject, forkJoin } from 'rxjs';

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

forkJoin(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.complete();
stream2.complete();

Смотрите живую демонстрацию: https://stackblitz.com/edit/rxjs-9nqtx6

Март 2019 г.: обновлено для RxJS 6.

person martin    schedule 27.09.2016
comment
Спасибо за ваш ответ! Вы проверили свой код, потому что журнал не вызывается на моем компьютере? - person Lajos; 27.09.2016

Если вы не хотите (или не знаете, когда) вызывать complete(), вы можете использовать combineLatest вместо forkJoin.

С combineLatest, когда любой наблюдаемый источник (в вашем случае ваше поведение subject) выдает значение, combineLatest вызовет:

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

combineLatest(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });

stream1.next(3);
stream2.next('three');

Журнал консоли:

(2) [2, два] // начальное состояние

(2) [3, two] // следующий запуск в потоке 1

(2) [3, three] // следующий запуск в потоке 2

Живая демонстрация: https://stackblitz.com/edit/rxjs-qzxo3n

person Alisson    schedule 27.02.2020

Вы можете использовать метод take(1) pipe или complete(), упомянутый выше.

private subjectStream1 = new BehaviorSubject(null);
stream1$: Observable = this.subjectStream1.asObservable();

private subjectStream2 = new BehaviorSubject(null);
stream2$: Observable = this.subjectStream2.asObservable();

forkJoin({
  stream1: this.stream1$.pipe(take(1)),
  stream2: this.stream2$.pipe(take(1))
})
.pipe(takeUntil(this._destroyed$))
.subscribe(values) => console.log(values));
person sjcoder    schedule 13.05.2021