Цепочка сервисов модернизации с поддержкой RxJava

У меня возникли проблемы с цепочкой наблюдаемых с использованием поддержки модифицированного RxJava. Я, вероятно, неправильно понимаю, как его использовать, иначе это может быть ошибка в модификации. Надеюсь, кто-то здесь может помочь мне понять, что происходит. Редактировать: я использую MockRestAdapter для этих ответов — это может иметь значение, поскольку я вижу, что реализации RxSupport немного отличаются.

Это поддельное банковское приложение. Он пытается выполнить передачу, и после завершения передачи он должен выполнить запрос учетных записей для обновления значений учетных записей. По сути, это просто предлог для меня, чтобы попробовать flatMap. К сожалению, следующий код не работает, подписчики никогда не получают уведомления:

Случай 1: объединение двух наблюдаемых, созданных модернизацией

Служба передачи (примечание: возвращает наблюдаемую, произведенную модернизацией):

@FormUrlEncoded @POST("/user/transactions/")
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,
                                             @Field("from_account_number") String fromAccountNumber,
                                             @Field("to_account_number") String toAccountNumber,
                                             @Field("amount") String amount);

Служба учетной записи (примечание: возвращает наблюдаемую, созданную модернизацией):

@FormUrlEncoded @POST("/user/accounts")
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);

Объединяет вместе две наблюдаемые, произведенные модернизацией:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return accountsService.getAccounts(session.getSessionId());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

Случай 2: создание собственного наблюдаемого объекта и связывание его с модифицированным объектом

Если я игнорирую встроенную поддержку Rx в Retrofit для вызова с «плоским отображением», он работает отлично! Все подписчики получают уведомления. Смотри ниже:

Служба новых учетных записей (примечание: не создает наблюдаемую):

@FormUrlEncoded @POST("/user/accounts")
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);

Создайте свою собственную наблюдаемую и излучайте элементы самостоятельно:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return Observable.create(new Observable.OnSubscribe<List<Account>>() {
                        @Override public void call(Subscriber<? super List<Account>> subscriber) {
                            List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
                            subscriber.onNext(accounts);
                            subscriber.onCompleted();
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

Любая помощь будет принята с благодарностью!


person Bradley Campbell    schedule 15.07.2014    source источник
comment
Какую версию модификации и rxjava вы используете?   -  person Miguel    schedule 15.07.2014


Ответы (1)


Ответ: да, вы должны иметь возможность связывать наблюдаемые из Retrofit. Кажется, есть ошибка в частном классе MockRestAdapter$MockRxSupport:createMockObservable. То, как выполняется планирование в отношении подписки подписчика на наблюдаемое, кажется неправильным. Подписка на наблюдаемое происходит после запуска самого потока HttpExecutor. Я считаю, что исходный поток, исходящий из вашего потока Schedulers.io(), завершен и отписан до того, как mockHandler.invokeSync вернул Observable, на который можно подписаться. Надеюсь, это объяснение имеет какой-то смысл, если вы посмотрите на код в модуле retrofit-mock.

В качестве обходного пути на данный момент с текущим кодом при использовании модифицированного макета вы можете заменить внутренний Executor по умолчанию своей собственной реализацией ImmediateExecutor. Это позволило бы, по крайней мере, при тестировании макетов иметь один поток потока, который будет предоставлен вашим Schedulers.io.

// ImmediateExecutor.java
public class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

// Create your RestAdapter with your ImmdiateExecutor
RestAdapter adapter = new RestAdapter.Builder()
            .setEndpoint(endpoint)
            .setExecutors(new ImmediateExecutor(), null)
            .build();

Что касается устранения проблемы в исходном коде, вы также можете включить в свой проект проект retrofit-mock в качестве источника и изменить метод MockRestAdapter$MockRxSupport:createMockObservable, используя приведенный ниже код. Я протестировал ваш вариант использования, и он решает проблему.

--- MockRestAdapter.java$MockRxSupport ----

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
        final RequestInterceptor interceptor, final Object[] args) {
      return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override public void call(final Subscriber<? super Object> subscriber) {
          try {
            if (subscriber.isUnsubscribed()) return;
            Observable observable =
                (Observable) mockHandler.invokeSync(methodInfo, interceptor, args);

            observable.subscribeOn(Schedulers.from(httpExecutor));

            //noinspection unchecked
            observable.subscribe(subscriber);

          } catch (RetrofitError e) {
            subscriber.onError(errorHandler.handleError(e));
          } catch (Throwable e) {
            subscriber.onError(e);
          }
        }
      });
    }

Создал проблему с проектом Retrofit здесь, посмотрим, примут ли они ее.

person Miguel    schedule 15.07.2014
comment
Это имеет смысл, я сам задавался этим вопросом, но, поскольку я одновременно изучаю и Retrofit, и Rx, я решил, что просто делаю это неправильно. Спасибо за регистрацию ошибки, я буду следить за ней, чтобы увидеть, что там происходит. - person Bradley Campbell; 16.07.2014
comment
Нет проблем, рад, что смог помочь. - person Miguel; 16.07.2014