У меня возникли проблемы с цепочкой наблюдаемых с использованием поддержки модифицированного RxJava. Я, вероятно, неправильно понимаю, как его использовать, иначе это может быть ошибка в модификации. Надеюсь, кто-то здесь может помочь мне понять, что происходит. Редактировать: я использую MockRestAdapter для этих ответов — это может иметь значение, поскольку я вижу, что реализации RxSupport немного отличаются.
Это поддельное банковское приложение. Он пытается выполнить передачу, и после завершения передачи он должен выполнить запрос учетных записей для обновления значений учетных записей. По сути, это просто предлог для меня, чтобы попробовать flatMap. К сожалению, следующий код не работает, подписчики никогда не получают уведомления:
Случай 1: объединение двух наблюдаемых, созданных модернизацией
Служба передачи (примечание: возвращает наблюдаемую, произведенную модернизацией):
@FormUrlEncoded @POST("/user/transactions/")
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,
@Field("from_account_number") String fromAccountNumber,
@Field("to_account_number") String toAccountNumber,
@Field("amount") String amount);
Служба учетной записи (примечание: возвращает наблюдаемую, созданную модернизацией):
@FormUrlEncoded @POST("/user/accounts")
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);
Объединяет вместе две наблюдаемые, произведенные модернизацией:
transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
.flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
@Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
return accountsService.getAccounts(session.getSessionId());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Случай 2: создание собственного наблюдаемого объекта и связывание его с модифицированным объектом
Если я игнорирую встроенную поддержку Rx в Retrofit для вызова с «плоским отображением», он работает отлично! Все подписчики получают уведомления. Смотри ниже:
Служба новых учетных записей (примечание: не создает наблюдаемую):
@FormUrlEncoded @POST("/user/accounts")
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);
Создайте свою собственную наблюдаемую и излучайте элементы самостоятельно:
transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
.flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
@Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
return Observable.create(new Observable.OnSubscribe<List<Account>>() {
@Override public void call(Subscriber<? super List<Account>> subscriber) {
List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
subscriber.onNext(accounts);
subscriber.onCompleted();
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Любая помощь будет принята с благодарностью!