Объединение нескольких операций RxJava2 с использованием Room и Firebase

Некоторая предыстория: я новичок в RxJava и пытаюсь сделать в приложении функцию, которая будет работать в автономном режиме и синхронизироваться при наличии сети. Я пытаюсь связать несколько операций, но я плохо разбираюсь в том, как связать вместе разные типы, такие как Completable, Maybe и Observable.

Вот список операций, которые необходимо выполнить в порядке после того, как пользователь добавил или обновил некоторые данные:

  1. Обновите данные в локальной базе данных, просто установите статус синхронизация, используя Room здесь.
  2. Загрузите файл в хранилище Firebase.
  3. Получите URL-адрес файла и обновите данные в базе данных Firebase.
  4. Обновите данные в локальной базе данных, установите статус синхронизировано.

Вот методы для каждой операции:

Обновить локальную базу данных:

private Completable setStatus(Entity entity, Entity.Status status){
    entity.setStatus(status);
    return Completable.fromAction(() -> localDataStore.updatePersonalPlace(personalPlaceEntity));
}

Загрузить файл в FirebaseStorage с помощью Rx2Firebase

RxFirebaseStorage.putBytes(storageRef, bytes); // returns a Maybe<TaskSnapshot>

Установить данные в базе данных firebase

RxFirebaseDatabase.setValue(dataRef, model); // returns a Completable

я пробовал

setStatus(...)
  .toObservable()
  .map(o -> uploadFile())
  .map(fileUrl -> updateFirebaseDatabase(fileUrl))
  .doOnNext(() -> setStatus(..) ) // set status to synced
  .subscribe(() -> Timber.d("Data updated", 
               t -> setStatus(..)); // set status back to what it was on error

Но это не работает, и я думаю, что не совсем понимаю основы того, как связать эти операции в цепочку. Ни одна из операций после toObservable не вызывается.

Я также пытался преобразовать возможно в завершаемый и связать их с помощью Completable.andThen, но я не уверен, как это сделать правильно, и мне нужно, чтобы fileUrl возвращался для обновления базы данных firebase.

Может ли кто-нибудь указать мне правильное направление относительно того, что я должен использовать здесь. Это довольно простая задача, которая сейчас кажется очень сложной, возможно, мой подход ужасно неверен.

Спасибо,


person Rahul Sainani    schedule 19.09.2017    source источник
comment
Можете ли вы показать содержимое subscribe? Вы обрабатываете ошибки? Возможно, какое-то действие должно быть в определенном потоке и вылетит.   -  person Kevin Robatel    schedule 19.09.2017
comment
Обновлено в вопросе. Просто зарегистрируйтесь в случае успеха и верните исходный статус в случае сбоя, чтобы сущность можно было синхронизировать позже.   -  person Rahul Sainani    schedule 19.09.2017


Ответы (3)


Я добавляю некоторые комментарии к вашему коду:

setStatus(...) // completable => (onError|onComplete)?
  .toObservable() // => will call (onError|onComplete)? (beacause of the nature of completable)
  .map(o -> uploadFile()) // never call because no item is emitted (completable...)
  .map(fileUrl -> updateFirebaseDatabase(fileUrl)) // never call too
  .doOnNext(() -> setStatus(..) ) // set status to synced // never call too
  .subscribe(..)

Вы должны изменить свой Completable на Single и вернуть что-то вроде true.

person Kevin Robatel    schedule 19.09.2017
comment
Я понимаю. Итак, использование Single решит эту проблему? Кроме того, является ли использование оператора карты лучшим подходом? В конце я получаю Single<Completable>, поскольку RxFirebaseDatabase.setValue(dataRef, model) возвращает Completable. - person Rahul Sainani; 19.09.2017
comment
Обновление: при использовании Single файл загружается, но вызов базы данных firebase никогда не выполняется. Вызов загрузки файла возвращает Maybe. - person Rahul Sainani; 19.09.2017
comment
setStatus должен возвращать что-то вроде Single‹Boolean› - person Kevin Robatel; 19.09.2017
comment
На данный момент он возвращает Single‹Integer›. - person Rahul Sainani; 19.09.2017
comment
Ах, извините за путаницу, это то же самое, что и в вопросе: RxFirebaseStorage.putBytes(storageRef, bytes); - person Rahul Sainani; 19.09.2017
comment
Я решил это, используя flatMap для uploadFile и flatMapCompletable для updateFirebaseDatabase. Благодаря этому докладу Дэна Лью я лучше понимаю, как работает flatMap. - person Rahul Sainani; 20.09.2017

Установить статус возвращает Completable, который будет вызывать только onComplete или onError. Ваши map и doOnNext никогда не вызываются, потому что они никогда не выдают никаких предметов. Что вы, вероятно, хотите, используйте doOnComplete или изучите использование concatArray, startWith или concatWith, которые могут связывать Completables.

person cwbowron    schedule 19.09.2017

Благодаря ответам от Kevinrob и cwbowron Я смог понять, что пошло не так.

setStatus теперь возвращает Single:

private Single<Integer> setStatus(Entity entity, Entity.Status status){
    entity.setStatus(status);
    return Single.fromCallable(() -> localDataStore.updatePersonalPlace(personalPlaceEntity));
}

Это возвращает завершаемый объект, который:

  1. Устанавливает статус объекта как синхронизирующийся в локальной базе данных.
  2. Преобразует растровое изображение в массив байтов.
  3. Загружает фото в хранилище Firebase.
  4. Получает URL-адрес фотографии.
  5. Обновляет данные в базе данных Firebase.
  6. Наконец, обновляет статус объекта как синхронизированный в локальной базе данных.

    return setPlaceStatusSingle(entity, Entity.Status.SYNCING)
                        .subscribeOn(Schedulers.io())
                        .toObservable()
                        .map(integer -> BitmapUtils.convertBitmapToByteArray(entity.getPhoto()))
                        .doOnNext(bytes -> Timber.d("Converted bitmap to bytes"))
                        .flatMap(bytes -> RxFirebaseStorage.putBytes(fileRef, bytes).toObservable())
                        // Using flatmap to pass on the observable downstream, using map was a mistake which created a Single<Observable<T>> 
                        .observeOn(Schedulers.io())
                        .doOnNext(taskSnapshot -> Timber.d("Uploaded to storage"))
                        .map(taskSnapshot -> taskSnapshot.getDownloadUrl().toString()) // Firebase stuff, getting the photo Url
                        .flatMapCompletable(photoUrl -> { 
                            Timber.d("Photo url %s", photoUrl);
                            model.setPhotoUrl(photoUrl);
                            return RxFirebaseDatabase.setValue(ref, model);
                        })
                        // Passes the Completable returned from setValue downstream
                        .observeOn(Schedulers.io())
                        .doOnComplete(() -> {
                            entity.setStatus(Entity.Status.SYNCED);
                            entity.setPhotoUrl(model.getPhotoUrl());
                            localDataStore.updateEntity(entity);
                        })
                        .doOnError(throwable -> onErrorUpdatingEntity(entity, throwable));
    
person Rahul Sainani    schedule 21.09.2017