RxJava и кэшированные данные

Я все еще новичок в RxJava и использую его в приложении для Android. Я прочитал тонну по этой теме, но все еще чувствую, что что-то упускаю.

У меня такой сценарий:

У меня есть данные, хранящиеся в системе, доступ к которым осуществляется через различные сервисные соединения (AIDL), и мне нужно получить данные из этой системы (может произойти 1-n количество асинхронных вызовов). Rx очень помог мне в упрощении этого кода. Однако весь этот процесс обычно занимает несколько секунд (более 5 секунд +), поэтому мне нужно кэшировать эти данные, чтобы ускорить работу собственного приложения.

Требования на этом этапе:

  1. Первоначальная подписка, кеш будет пустым, поэтому нам нужно подождать необходимое время для загрузки. Ничего страшного. После этого данные следует закэшировать.

  2. Последующие загрузки должны извлекать данные из кеша, но затем данные должны быть перезагружены, и кеш диска должен оставаться за кулисами.

Проблема: у меня есть два Observable - A и B. A содержит вложенные Observables, которые извлекают данные из локальных сервисов (здесь много чего происходит). B намного проще. B просто содержит код для извлечения данных из дискового кеша.

Необходимо решить: а) Вернуть кэшированный элемент (если он кэширован) и продолжить перезагрузку дискового кеша. б) Кэш пуст, загрузите данные из системы, закешируйте их и верните. Последующие вызовы возвращаются к «а».

Некоторые люди рекомендовали несколько операций, таких как плоская карта, слияние и даже объекты, но по какой-то причине у меня возникли проблемы со связью точек.

Как я могу это сделать?


person Donn Felker    schedule 14.11.2014    source источник
comment
У меня нет времени дать полный ответ, но похоже, что вам нужен один общедоступный Observable, который извлекает данные из кеша. Затем внутри этого Observable он может либо получить исходные данные, кеш и вернуть - либо он может вернуть + запустить отдельный процесс.   -  person Dan Lew    schedule 15.11.2014
comment
Что вы имеете в виду, но тогда данные должны быть перезагружены и продолжать перезагружать кеш диска. Не могли бы вы прояснить, что происходит с дисковым кешем?   -  person a.bertucci    schedule 15.11.2014


Ответы (3)


Вот несколько вариантов, как это сделать. Я постараюсь объяснить их как можно лучше по ходу дела. Это код салфетки, и я использую лямбда-синтаксис в стиле Java8, потому что я ленив и он красивее. :)

  1. Тема, такая как AsyncSubject, была бы идеальной, если бы вы могли хранить их как состояния экземпляра в памяти, хотя похоже, что вам нужно сохранить их на диск. Однако я думаю, что об этом подходе стоит упомянуть на всякий случай. Кроме того, это просто отличная техника, которую нужно знать. AsyncSubject - это Observable, который испускает только ПОСЛЕДНЕЕ значение, опубликованное для него (Subject является одновременно Observer и Observable), и начинает излучать только после того, как onCompleted был вызван. Таким образом, все, что подписывается после этого завершения, получит следующее значение.

    В этом случае у вас может быть (в классе приложения или другом экземпляре синглтона на уровне приложения):

    public class MyApplication extends Application {    
        private final AsyncSubject<Foo> foo = AsyncSubject.create();
    
        /** Asynchronously gets foo and stores it in the subject. */
        public void fetchFooAsync() {
            // Gets the observable that does all the heavy lifting.
            // It should emit one item and then complete.
            FooHelper.getTheFooObservable().subscribe(foo);
        }
    
        /** Provides the foo for any consumers who need a foo. */
        public Observable<Foo> getFoo() {
            return foo;
        }
    
    }
    
  2. Откладывая наблюдаемое. Observable.defer позволяет вам подождать создания Observable до тех пор, пока на него не будет подписана подписка. Вы можете использовать это, чтобы разрешить выборку из кэша диска в фоновом режиме, а затем вернуть кешированную версию или, если не в кеше, совершить реальную сделку.

    Эта версия предполагает, что ваш код получения, как выборка из кеша, так и создание без перехвата, блокируют вызовы, а не наблюдаемые, и отсрочка действительно работает в фоновом режиме. Например:

    public Observable<Foo> getFoo() {
        Observable.defer(() -> {
            if (FooHelper.isFooCached()) {
                return Observable.just(FooHelper.getFooFromCacheBlocking());
            }
            return Observable.just(FooHelper.createNewFooBlocking());
        }).subscribeOn(Schedulers.io());
    }
    
  3. Используйте concatWith и take. Здесь мы предполагаем, что наш метод получения Foo из дискового кеша либо испускает один элемент и завершается, либо просто завершается без отправки, если он пуст.

    public Observable<Foo> getFoo() {
        return FooHelper.getCachedFooObservable()
                .concatWith(FooHelper.getRealFooObservable())
                .take(1);
    }
    

    Этот метод должен пытаться получить реальную сделку только в том случае, если кешируемое наблюдаемое завершилось пустым.

  4. Используйте amb или ambWith. Это, наверное, одно из самых безумных решений, но на него стоит обратить внимание. amb в основном берет пару (или больше с перегрузками) наблюдаемых и ждет, пока один из них не выдаст элемент, затем полностью отбрасывает другой наблюдаемый и просто берет тот, который выиграл гонку. Единственный способ, которым это было бы полезно, - это если этап вычисления создания нового Foo может быть быстрее, чем его выборка с диска. В этом случае вы можете сделать что-то вроде этого:

    public Observable<Foo> getFoo() {
        return Observable.amb(
                FooHelper.getCachedFooObservable(),
                FooHelper.getRealFooObservable());
    }
    

Я предпочитаю вариант 3. Что касается фактического кэширования, у вас может быть что-то вроде этого в одной из точек входа (желательно до того, как нам понадобится Foo, поскольку, как вы сказали, это длительная операция). должен получить кешированную версию, пока она закончила запись. Использование AsyncSubject здесь также может помочь, чтобы убедиться, что мы не запускаем работу несколько раз, ожидая ее написания. Потребители получат только законченный результат, но, опять же, это работает, только если его можно разумно сохранить в памяти.

if (!FooHelper.isFooCached()) {
    getFoo()
        .subscribeOn(Schedulers.io())
        .subscribe((foo) -> FooHelper.cacheTheFoo(foo));
}

Обратите внимание, что вы должны либо использовать однопоточный планировщик, предназначенный для записи (и чтения) на диск, и использовать .observeOn(foo) после .subscribeOn(...), либо иным образом синхронизировать доступ к кешу диска, чтобы предотвратить проблемы параллелизма.

person lopar    schedule 19.01.2015

Недавно я опубликовал на Github для Android и Java библиотеку под названием RxCache, которая отвечает вашим потребностям в кешировании. данные с использованием наблюдаемых.

RxCache реализует два уровня кэширования - память и диск, и учитывает несколько аннотаций для настройки поведения каждого провайдера.

Настоятельно рекомендуется использовать с Retrofit для данных, полученных из HTTP-вызовов. Используя лямбда-выражение, вы можете сформулировать выражение следующим образом:

    rxCache.getUser(retrofit.getUser(id), () -> true).flatmap(user -> user);

Надеюсь, вам будет интересно :)

person Víctor Albertos    schedule 20.01.2016

Взгляните на проект ниже. Это мой личный взгляд на вещи, и я использовал этот шаблон в ряде приложений.

https://github.com/zsiegel/rxandroid-architecture-sample

Взгляните на PersistenceService. Вместо того, чтобы обращаться к базе данных (или MockService в примере проекта), вы можете просто иметь локальный список пользователей, которые обновляются с помощью метода save (), и просто возвращать его в get ().

Дайте знать, если у вас появятся вопросы.

person Zac Siegel    schedule 21.11.2014