Observable.forkJoin и аргумент массива

В документации Observables forkJoin говорится, что args может быть массивом, но в нем не приведен пример этого:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

Я пробовал функцию, аналогичную той, что я перечислил (ниже), но обнаружил ошибку:

:3000/angular2/src/platform/browser/browser_adapter.js:76 

EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function

Обрезанная версия моей функции ниже:

processStuff( inputObject ) {
  let _self = this;

  return new Observable(function(observer) {
    let observableBatch = [];

    inputObject.forEach(function(componentarray, key) {
      observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
    });

    Observable.forkJoin(
      observableBatch
    // );
    ).subscribe(() => {
      observer.next();
      observer.complete();
    });

  });
}

Корень моего вопроса связан с завершением цикла, прежде чем продолжить, как указано здесь: Angular2 Observable - как дождаться завершения всех вызовов функций в цикле, прежде чем продолжить?

Но я еще не полностью освоил правильное использование forkJoin с массивом и правильный синтаксис для этого.

Я очень благодарен за вашу помощь.

ПРИМЕЧАНИЕ: ПРИМЕР ТРЕТЬЕЙ ФУНКЦИИ, ВОЗВРАЩАЮЩЕЙ НАБЛЮДАЕМУЮ

thirdFunction() {
  let _self = this;

  return Observable.create((observer) => {
  // return new Observable(function(observer) {
    ...

    observer.next(responseargs);
    observer.complete();
  });
}

processStuff(inputObject) {
  let _self = this;
  let observableBatch = [];

  inputObject.forEach((componentarray, key) => {
    observableBatch.push(_self.thirdFunction().map((res: Response) => res.json()));
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff(inputObject)
    .subscribe()
}

person Benjamin McFerren    schedule 27.02.2016    source источник
comment
Observable.forkJoin([http.get(), http.get(), http.get(), etc]).subscribe((res) => ...) См. Также stackoverflow.com/a/35247372/4933038   -  person Eric Martinez    schedule 28.02.2016
comment
Вы забыли import 'rxjs/add/observable/forkJoin' или import 'rxjs/Rx'?   -  person Sasxa    schedule 28.02.2016
comment
У меня есть только этот импорт {Observable} из rxjs / Observable;   -  person Benjamin McFerren    schedule 28.02.2016
comment
См. это, то же самое относится к другим операторам / генераторам ..   -  person Sasxa    schedule 28.02.2016


Ответы (2)


Вам нужно импортировать операторы, которые не загружаются по умолчанию. Это то, что обычно означает EXCEPTION Observable.xxxx is not a function. Вы можете импортировать все операторы, добавив полный rxjs в свой загрузочный файл, например:

import 'rxjs/Rx'

или путем импорта определенных операторов, в вашем случае:

import 'rxjs/add/observable/forkJoin'

Еще одно наблюдение / предложение по поводу вашего кода: попробуйте придерживаться одного синтаксиса. Вы смешиваете es5, es6, typescript ... и пока он работает, это только запутает вас в долгосрочной перспективе. Кроме того, если вы только начинаете работать с Observables, постарайтесь избегать new Observable() и вместо этого используйте операторы создания;

processStuff( inputObject ) {
  let observableBatch = [];

  inputObject.forEach(( componentarray, key ) => {
    observableBatch.push( this.http.get( key + '.json').map((res: Response) => res.json()) );
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff( inputObject )
    .subscribe()
}

Наконец, обратитесь к соответствующей документации - Angular2 использует RxJS v5, а предоставленная вами ссылка предназначена для RxJS v4. Документы по v5 еще не завершены, но вы можете найти описания во многих исходных файлах.

person Sasxa    schedule 27.02.2016
comment
Привет, @Sasxa, у меня проблемы, потому что я использую третью функцию вместо this.http.get (... Не могли бы вы предложить пример, где эта третья функция возвращает новый Observable (функция (наблюдатель) {Я отредактировал свой исходный пост чтобы показать пример. Большое спасибо за вашу помощь. - person Benjamin McFerren; 04.03.2016
comment
разобрался - это необходимо третьей функции: return Observable.create ((Observable) = ›{... Observable.next (test); Observable.complete (); - person Benjamin McFerren; 05.03.2016
comment
Мне нравится это решение. Что я не могу понять, так это как обрабатывать ошибки в некоторых запросах, когда большинство из них успешны? - person Darryl Wagoner WA1GON; 03.03.2018
comment
В последней версии он должен возвращать forkJoin (observableBatch) вместо возврата Observable.forkJoin (observableBatch) - person Volodymyr; 09.10.2018
comment
Ответ @Volodymyr - это тот, который совместим с angular 7+, а импорт - import {forkJoin} from 'rxjs'; - person Biiz; 09.01.2020

Вот рабочая демонстрация с использованием Angular 12. Синтаксис немного отличается от принятого ответа (больше нет Observable):

https://stackblitz.com/edit/angular-map-to-forkjoin?file=src/app/app.component.ts

ПРИМЕЧАНИЕ. Откройте консоль Developer Tools, чтобы увидеть, что я делаю.

Во-первых, он принимает массив элементов JSON (в данном случае массив пользователей) и преобразует их в отдельные вызовы HTTP Patch с помощью метода map.

  private mapCalls(users: any[]): Observable<any>[] {
    return users.map(user => {
      // Each User will be patched to the endpoint
      // None of these calls will be made until forkJoin is used
      return this.http.patch(
        `https://jsonplaceholder.typicode.com/users/${user.id}`,
        user
      );
    });
  }

Теперь это массив наблюдаемых. forkJoin добавлен как оболочка для выполнения этих вызовов

return forkJoin(mappedCalls);

Затем можно подписаться на конечный результат, который будет выводиться в виде единого массива с тем же количеством элементов.

ПРИМЕЧАНИЕ. Если один вызов в forkJoin завершится неудачно, ВСЕ ОНИ БУДУТ ОТКАЗАТЬСЯ.

person austinthedeveloper    schedule 27.07.2021