Повторный запрос и обновление подписчиков с помощью RxJava2

У меня есть уровень данных в моем приложении, который поддерживается службой модернизации. (Пока есть только постоянство по сети. Когда я продвинусь дальше, я добавлю автономное локальное хранилище)

Retrofit возвращает мне Observable<List<Item>>, когда я звоню на сервер. Это хорошо работает. В моем подписчике я получаю список при подписке, а затем могу заполнить свой пользовательский интерфейс файлом Items.

Проблема, с которой я сталкиваюсь, заключается в следующем: если список изменен (каким-то внешним механизмом), как я могу сделать наблюдаемый повторный запрос к службе модернизации и создать новый список элементов. Я буду знать, что данные устарели, но не знаю, как инициировать повторный запрос.

Вот урезанная версия моего DataManager

class DataManager {

    // Retrofit
    RetrofitItemsService itemsService;

    // The observalble provided by retrofit
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
    }

    /* Creates and stores an observable if one has not been created yet.
     * Returns the observable so that it can be subscribed to by the function caller
     */
    public Observable<List<Item>> getItems(){
        if(itemsObservable == null){
            itemsObservable = itemsService.getItems();
        }

        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            /*
             < < < Here > > >
             If someone has previously called getItems before this item was added, they now have stale data.

             How can I call something like:

             itemsObservable.refreshAllSubscribers()
            */
        });

        return call;
    }
}

person Stephen    schedule 19.12.2016    source источник


Ответы (2)


Проблема, с которой вы здесь боретесь, заключается в разнице между наблюдаемыми горячими и холодными. Есть много отличных статей, которые вы можете найти в Google, в которых подробно описаны различия, поэтому позвольте мне описать только основы.

Cold observable создает нового производителя для каждого подписчика. Это означает, что когда два разных подписчика подписываются на один и тот же холодный наблюдаемый объект, каждый из них получает разные экземпляры этих выбросов. Они могут (!) быть равными, но тем не менее это разные объекты. Применительно к вашему случаю здесь каждый подписчик получает своего собственного производителя, который запрашивает у сервера данные и передает их в поток. Каждому подписчику подаются данные от его собственного производителя.

Горячий наблюдаемый разделяет производителя со всеми его наблюдателями. Если производитель, например, выполняет итерацию по коллекции объектов, переход со вторым подписчиком в середине выбросов означает, что он получит только элементы, выпущенные впоследствии (если они не изменены с помощью таких операторов, как replay ). Каждый объект, полученный любым подписчиком, также является одним и тем же экземпляром для всех наблюдателей, поскольку он исходит от одного производителя.

Итак, судя по всему, вам нужно иметь горячую наблюдаемую для распространения ваших данных, чтобы, когда вы знаете, что она больше не действительна, вы просто пропускали ее один раз через эту горячую наблюдаемую, и каждый наблюдатель был бы доволен обновлением.

К счастью, превратить холодную наблюдаемую в горячую обычно не составляет большого труда. Вы можете создать своего собственного производителя, который имитирует это поведение, использовать один из популярных операторов, таких как share, или вы можете просто преобразовать поток, чтобы он вел себя как один.

Я бы посоветовал использовать PublishSubject для обновления данных и объединения их с исходным холодным наблюдаемым, например:

class DataManager {

    .....

    PublishSubject<Boolean> refreshSubject = PublishSubject.create();

    // The observable for retrieving always fresh data
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
        itemsObservable = itemsService.getItems()
                              .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems()))
    }


    public Observable<List<Item>> getItems(){
        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            refreshSubject.onNext(true);
        });

        return call;
    }
}
person koperko    schedule 20.12.2016

Я предполагаю, что itemsService.getItems() возвращает один элемент Observable, поэтому потребители в любом случае должны повторно подписаться, чтобы получить свежие данные, и они получат их, поскольку Retrofit Observables также откладываются/ленивы.

У вас может быть отдельный «длинный» Observable с помощью PublishSubject, который вы можете запускать при изменении данных:

final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized();

public Observable<Object> itemsChanged() {
    return onItemsChanged;
}

public Completable addItem(Item item){
    Completable call = itemsService.addItem(item);

    // prevent triggering the addItem multiple times
    // Needs RxJava 2 Extensions library for now
    // as there is no Completable.cache() or equivalent in 2.0.3
    CompletableSubject cs = CompletableSubject.create();

    call.doOnComplete(() -> onItemsChanged.onNext("changed"))
    .subscribe(cs);

    return cs;
}
person akarnokd    schedule 20.12.2016