UndeliverableException при вызове onError ObservableEmitter в RXjava2

У меня есть метод, который создает эмиттер, как показано ниже, есть проблема (возможно, это нормальное поведение) с вызовом onError в обратном вызове модификации. Я получил UndeliverableException при попытке вызвать onError.

Я могу решить эту проблему, проверив subscriber.isDiposed(), интересно, как я могу вызвать onError, потому что мне нужно уведомить мой уровень пользовательского интерфейса.

  • Дополнение 1
--> RxJava2CallAdapterFactoryalready implemented   
private static Retrofit.Builder builderSwift = new Retrofit.Builder()
           .baseUrl(URL_SWIFT)
           .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
           .addConverterFactory(GsonConverterFactory.create())
           .addConverterFactory(new ToStringConverterFactory());

--> When i added below code to application class app won't crash
--> but i get java.lang.exception instead of my custom exception  

RxJavaPlugins.setErrorHandler(Functions<Throwable>emptyConsumer());

@Override
    public void onFileUploadError(Throwable e) {
        Log.d(TAG, "onFileUploadError: " + e.getMessage());
    }
public Observable<UploadResponseBean> upload(final UploadRequestBean uploadRequestBean, final File file) {
    return Observable.create(new ObservableOnSubscribe<UploadResponseBean>() {
        @Override
        public void subscribe(@NonNull final ObservableEmitter<UploadResponseBean> subscriber) throws Exception {

                 // ---> There are no problem with subscriber while calling onError        

                // ---> Retrofit2 service request 
                ftsService.upload(token, uploadRequestBean, body).enqueue(new Callback<UploadResponseBean>() {
                    @Override
                    public void onResponse(Call<UploadResponseBean> call, Response<UploadResponseBean> response) {

                        if (response.code() == 200){
                            // --->  calling onNext works properly
                            subscriber.onNext(new UploadResponseBean(response.body().getUrl()));
                        }
                        else{
                            // --->  calling onError throws UndeliverableException
                            subscriber.onError(new NetworkConnectionException(response.message()));                
                        }
                    }

                    @Override
                    public void onFailure(Call call, Throwable t) {
                        subscriber.onError(new NetworkConnectionException(t.getMessage()));
                    }
                });
        }
    });
}

person Talha    schedule 10.05.2017    source источник


Ответы (3)


Начиная с версии 2.1.1 tryOnError доступно:

API эмиттера (например, FlowableEmitter, SingleEmitter и т. д.) теперь содержит новый метод tryOnError, который пытается сгенерировать Throwable, если последовательность не отменена/удалена. В отличие от обычного onError, если нисходящий поток больше не желает принимать события, метод возвращает false и не сигнализирует UndeliverableException.

https://github.com/ReactiveX/RxJava/blob/2.x/CHANGES.md

person Yazazzello    schedule 07.10.2017
comment
Но это по-прежнему не гарантирует, что RxJavaPlugins.onError() не будет вызвано, см. github.com/ReactiveX/ RxJava/issues/5477 - person arekolek; 15.06.2018
comment
Это считается экспериментальным и нестабильным! - person IgorGanapolsky; 09.08.2018
comment
@arekolek они закрыли эту проблему, не исправив - person Yazazzello; 30.11.2018
comment
Не думаю, что они смогут это исправить. Когда у меня возникла эта проблема, я установил свой собственный обработчик ошибок, который подавлял одну конкретную ошибку, которую я хотел подавить. - person arekolek; 30.11.2018

Проблема в том, что вы говорите, что вам нужно проверить, не удалено ли уже Subscriber, потому что RxJava2 более строг в отношении ошибок, которые были выброшены после того, как Subscriber уже удалено. трассировка и вызовы обработчика необработанных исключений потока. вы можете прочитать полное объяснение здесь.

Теперь то, что здесь происходит, заключается в том, что вы, вероятно, отменили подписку (распоряжаетесь) от этого Observable до того, как запрос был выполнен и доставлена ​​ошибка, и поэтому вы получаете UndeliverableException.

Интересно, как я могу вызвать onError, потому что мне нужно уведомить мой уровень пользовательского интерфейса.

поскольку это произошло после того, как ваш пользовательский интерфейс был отписан, пользовательскому интерфейсу все равно. в обычном потоке эта ошибка должна доставляться правильно.

Некоторые общие моменты, касающиеся вашей реализации:

  • та же проблема произойдет в onError, если вы ранее отписались.
  • здесь нет логики отмены (вот что вызывает эту проблему), поэтому запрос продолжается, даже если Subscriber отписался.
  • даже если вы реализуете эту логику (используя ObservableEmitter.setCancellable() / setDisposable()), вы все равно столкнетесь с этой проблемой, если вы отмените подписку до того, как запрос будет выполнен - ​​это приведет к отмене, и ваша логика onFailure вызовет onError(), и произойдет та же проблема.
  • поскольку вы выполняете асинхронный вызов с помощью Retrofit, указанная подписка Scheduler не будет выполнять фактический запрос в потоке Scheduler, а только в подписке. вы можете использовать Observable.fromCallable и блокирующий вызов Retrofit execute, чтобы получить больший контроль над фактическим вызовом потока.

Подводя итог, можно сказать, что защита вызовов onError() с помощью ObservableEmitter.isDiposed() в этом случае является хорошей практикой.
Но я думаю, что лучше всего использовать адаптер вызовов Retrofit RxJava, так что вы будете обернуты Observable тем, что выполняете вызов Retrofit и уже есть все эти соображения.

person yosriz    schedule 10.05.2017
comment
спасибо за ваш ответ, я добавил дополнительную информацию, можете ли вы взглянуть на это, я также проверяю эту проблему, но я не смог найти решение github.com/ReactiveX/RxJava/issues/4863 - person Talha; 10.05.2017
comment
Я нашел проблему. В ответе на модификацию есть метод контроля ошибок, и я понимаю, что дважды запускаю onError, потому что я получил это исключение для второго триггера. Как вы сказали, я избавился от подписчика раньше. - person Talha; 10.05.2017

Я обнаружил, что эта проблема была вызвана использованием неправильного контекста при получении модели представления во фрагменте:

ViewModelProviders.of(requireActivity(), myViewModelFactory).get(MyViewModel.class);

Из-за этого модель представления жила в контексте активности, а не фрагмента. Изменение его на следующий код устранило проблему.

ViewModelProviders.of(this, myViewModelFactory).get(MyViewModel.class);
person arenaq    schedule 17.10.2019