Поделитесь подпиской, если это не сделано

У меня есть угловое приложение, которое должно синхронизировать некоторые данные с сервером при некоторых условиях (некоторые триггеры в программном обеспечении или по запросу пользователя). Итак, у меня есть такая функция:

    ...

    public createSyncObservable(): Observable<any> {
        return this.retriveDataFromStorage().pipe(
            switchMap(
                (data) => forkJoin(this.api.sendData1(data.data1),this.api.sendData2(data.data2),this.api.sendData3(data.data3))
            ),
            switchMap(
                (data) => this.api.getDataFromServer()
            ),
            switchMap(
                (data) => this.updateLocal(data)
            )
        )
    }

Поведение, которое я хочу, это:

  • Если пользователь (или какой-либо триггер) запрашивает синхронизацию, и она уже происходит, я не должен делать это снова, просто дождитесь окончания текущей синхронизации и верните тот же наблюдаемый (общий).
  • Если последняя синхронизация уже завершена, она должна начаться снова (создать новую наблюдаемую).

Мое лучшее решение на данный момент - сделать что-то вроде этого (непроверенный код):

    ...
    public syncData(): Observable<any> {
        if (this.observable_complete) {
            this.observable_complete = false;
            this.syncObservable$ = this.createSyncObservable().pipe(share())
            this.syncObservable$.subscribe(
                (data) => {this.observable_complete = true}
            ) 
        }
        return this.syncObservable$;
    }

Это путь? Может быть, мне не хватает какого-то оператора RxJS, который помог бы мне в этом случае? Это решение кажется немного хакерским...


person DSLima90    schedule 11.09.2020    source источник


Ответы (2)


Если вызов this.createSyncObservable() не выполняет никакой фактической работы, а только подписывается на наблюдаемое, которое он возвращает, вам нужно вызвать функцию только один раз. Затем вы можете просто сделать:

public syncData$ = this.createSyncObservable().pipe(share());

share отпишется от своего источника, если не останется подписчиков (т. е. когда this.createSyncObservable() завершится). Таким образом, подписчик на this.syncData$ инициирует подписку на наблюдаемую, возвращенную из this.createSyncObservable(), если она завершена.

// The first subscribe will trigger a subscribe to createSyncObservable()
syncData$.subscribe()
// A second subscribe while the first hasn't completed won't trigger a subscribe
// to createSyncObservable() but instead just wait for its response
syncData$.subscribe()

// After some time ...
// Another subscribe after createSyncObservable() completed will trigger another 
// subscribe to createSyncObservable()
syncData$.subscribe()

https://stackblitz.com/edit/rxjs-44qzaj?file=index.ts

person frido    schedule 11.09.2020
comment
Это действительно лучший путь, на мой взгляд. Просто убедитесь, что createSyncObservable завершено, и все готово. - person Guilhermevrs; 11.09.2020
comment
Вы действительно правы. Я неправильно понял, как работает канал общего доступа, но нет необходимости проверять, вернулся ли он, и создавать еще один наблюдаемый объект, поскольку тот же самый будет срабатывать при новой подписке. @Guilhermevrs, спасибо, я позабочусь о том, чтобы syncObservable всегда завершался, может быть, добавить канал тайм-аута, чтобы быть уверенным ... - person DSLima90; 11.09.2020

Вы должны попробовать с оператором takeWhile/skipWhile:

Это всего лишь пример, но я надеюсь, что вы найдете его полезным.

private isResourceFree = true;

public createSyncObservable(): Observable<any> {

        return of(isResourceFree).pipe(
            takeWhile(val => val),
            tap(_ => this.isResourceFree = false),
            switchMap(_ => this.retriveDataFromStorage()),
            switchMap(
                (data) => forkJoin(this.api.sendData1(data.data1),this.api.sendData2(data.data2),this.api.sendData3(data.data3))
            ),
            switchMap(
                (data) => this.api.getDataFromServer()
            ),
            switchMap(
                (data) => this.updateLocal(data)
            ),
            tap(_ => this.isResourceFree = true)
        )
    }
person Edoardo Pacciani    schedule 11.09.2020