rxjava: Могу ли я использовать retry (), но с задержкой?

Я использую rxjava в своем приложении для Android для асинхронной обработки сетевых запросов. Теперь я хотел бы повторить неудачный сетевой запрос только по прошествии определенного времени.

Есть ли способ использовать retry () для Observable, но повторить попытку только после определенной задержки?

Есть ли способ сообщить Observable, что в настоящее время выполняется повторная попытка (а не в первый раз)?

Я посмотрел на debounce () / throttleWithTimeout (), но они, похоже, делают что-то другое.

Изменить:

Я думаю, что нашел один способ сделать это, но мне было бы интересно либо подтверждение того, что это правильный способ сделать это, либо другие, более эффективные способы.

Я делаю следующее: в методе call () моего Observable.OnSubscribe, прежде чем я вызываю метод Subscribers onError (), я просто позволяю Thread спать на желаемое время. Итак, чтобы повторить попытку каждые 1000 миллисекунд, я делаю что-то вроде этого:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Поскольку этот метод в любом случае выполняется в потоке ввода-вывода, он не блокирует пользовательский интерфейс. Единственная проблема, которую я вижу, заключается в том, что даже первая ошибка сообщается с задержкой, поэтому задержка есть, даже если нет retry (). Я бы хотел, чтобы задержка применялась не после ошибки, а перед повторной попыткой (но, очевидно, не до первой попытки).


person david.mihola    schedule 27.02.2014    source источник


Ответы (15)


Вы можете использовать оператор retryWhen(), чтобы добавить логику повтора к любому Observable.

Следующий класс содержит логику повтора:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

Использование:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));
person kjones    schedule 13.08.2014
comment
Приятно знать - большое вам спасибо! Теперь вопрос: принимаю ли я ваш ответ как правильный или я сохраняю ответ Alexis Contour, потому что он был правильным, когда я задал вопрос ... - person david.mihola; 12.09.2014
comment
@nima У меня была та же проблема, замените RetryWithDelay на это: pastebin.com/6SiZeKnC - person user1480019; 12.06.2015
comment
@GerritHoekstra Большое спасибо, попробую - person Nima G; 12.06.2015
comment
Похоже, что оператор RxJava retryWhen изменился с тех пор, как я это написал изначально. Я получу обновленный ответ. - person kjones; 13.06.2015
comment
Кроме того, retryDelayMillis должен иметь тип long, поскольку это то, что Observable.timer() принимает в качестве первого параметра. - person fast3r; 18.05.2016
comment
Вы должны обновить этот ответ, чтобы он соответствовал RxJava 2 - person Vishnu M.; 24.02.2017
comment
как версия rxjava 2 будет искать котлин? - person Gabriel Sanmartin; 21.02.2018
comment
Реализацию, не относящуюся конкретно к Observable, можно найти здесь (поддерживает все RxJava 2. x-совместимые наблюдаемые на основе интерфейса Publisher). - person Eido95; 06.11.2018
comment
Вау это здорово - person M Moersalin; 07.10.2019
comment
@kjones, можете ли вы помочь с этой проблемой rxjava: stackoverflow.com/questions/61352701/ - person user606669; 22.04.2020

На основе ответа Пола, и если вас не беспокоят retryWhen проблемы, указанные в Abhijit Sarkar, самый простой способ отложить повторную подписку с безусловным rxJava2:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

Возможно, вы захотите увидеть больше примеров и объяснений на странице retryWhen and repeatWhen.

person McX    schedule 14.03.2017

Этот пример работает с jxjava 2.2.2:

Повторите попытку без промедления:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

Повторить с задержкой:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

Наш исходный сингл не работает, если не удается someConnection.send (). Когда это происходит, наблюдаемые отказы внутри retryWhen выдают ошибку. Мы задерживаем это излучение на 300 мс и отправляем его обратно, чтобы сигнализировать о повторной попытке. take (5) гарантирует, что наша наблюдаемая сигнализация прекратит работу после получения пяти ошибок. retryWhen видит завершение и не повторяет попытку после пятой ошибки.

person Erunafailaro    schedule 09.10.2018

Это решение, основанное на увиденных мной фрагментах Бена Кристенсена, RetryWhen Example и RetryWhenTestsConditional (мне пришлось изменить n.getThrowable() на n, чтобы он работал). Я использовал evant / gradle-retrolambda, чтобы лямбда-нотация работала на Android, но вам не нужно использовать лямбда-выражения ( хотя настоятельно рекомендуется). Для задержки я реализовал экспоненциальный откат, но вы можете подключить туда любую логику отсрочки, которая вам нужна. Для полноты я добавил операторы subscribeOn и observeOn. Я использую ReactiveX / RxAndroid для AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
person david-hoze    schedule 23.11.2014
comment
это выглядит элегантно, но я не использую функции лямбы, как я могу писать без лямбов? @ amitai-hoze - person ericn; 22.07.2016
comment
также как мне написать это так, чтобы я мог повторно использовать эту функцию повтора для других Observable объектов? - person ericn; 22.07.2016
comment
неважно, я использовал kjones решение, и оно отлично подходит для меня, спасибо - person ericn; 22.07.2016

вместо MyRequestObservable.retry я использую функцию-оболочку retryObservable (MyRequestObservable, retrycount, seconds), которая возвращает новый Observable, который обрабатывает косвенное обращение для задержки, поэтому я могу сделать

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}
person Alexis Contour    schedule 18.04.2014
comment
Мне ужасно жаль, что я не ответил раньше - почему-то я пропустил уведомление от SO, что был ответ на мой вопрос ... Я поддержал ваш ответ, потому что мне нравится идея, но я не уверен, что - в соответствии с принципами SO - Я должен принять ответ, поскольку это скорее обходной путь, чем прямой ответ. Но я думаю, поскольку вы даете обходной путь, ответ на мой первоначальный вопрос - нет, вы не можете ... - person david.mihola; 05.05.2014

retryWhen - сложный, возможно, даже глючный оператор. Официальный документ и хотя бы один ответ здесь используют оператор range, который завершится ошибкой, если не будет предпринято никаких повторных попыток. См. Мое обсуждение с участником ReactiveX Дэвидом Карноком.

Я улучшил ответ kjones, изменив flatMap на concatMap и добавив класс RetryDelayStrategy. flatMap не сохраняет порядок эмиссии, в то время как concatMap сохраняет, что важно для задержек с откатом. RetryDelayStrategy, как видно из названия, позволяет пользователю выбирать из различных режимов генерации задержек повтора, включая откат. Код доступен на моем GitHub со следующими тестовыми примерами:

  1. Успешно с 1-й попытки (без повторных попыток)
  2. Ошибка после 1 попытки
  3. Попытки повторить попытку 3 раза, но успешны на втором, поэтому не повторяют третий раз
  4. Успешно при третьей попытке

См. setRandomJokes метод.

person Abhijit Sarkar    schedule 18.07.2016

На основании ответа kjones, здесь представлена ​​версия RxJava 2.x для Котлина с задержкой в ​​качестве расширения. Замените Observable, чтобы создать такое же расширение для Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

Затем просто используйте его на наблюдаемом observable.retryWithDelay(3, 1000)

person JuliusScript    schedule 30.08.2019
comment
Можно ли также заменить это на Single? - person Papps; 18.09.2019
comment
@Papps Да, это должно сработать, просто обратите внимание, что flatMap там придется использовать Flowable.timer и Flowable.error , даже если функция Single<T>.retryWithDelay. - person JuliusScript; 18.09.2019

Теперь с RxJava версии 1.0+ вы можете использовать zipWith для повторной попытки с задержкой.

Добавление изменений в ответ kjones.

Изменено

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}
person Omkar    schedule 13.09.2016

Тот же ответ, что и для kjones, но обновлен до последней версии. Для версии RxJava 2.x: ('io.reactivex.rxjava2: rxjava : 2.1.3 ')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}

Использование:

// Добавляем логику повтора к существующему наблюдаемому. // Повторная попытка не более 3 раз с задержкой в ​​2 секунды.

observable
    .retryWhen(new RetryWithDelay(3, 2000));
person Mihuilk    schedule 27.09.2017

Вы можете добавить задержку в Observable, возвращаемом в retryWhen Operator

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

Вы можете увидеть больше примеров здесь. https://github.com/politrons/reactive

person paul    schedule 18.07.2016

Просто сделайте это так:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });
person Allen Vork    schedule 04.07.2016

Для версии Kotlin и RxJava1

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}
person Cody    schedule 17.04.2018

(Kotlin) Я немного улучшил код с экспоненциальным откатом и применил защиту от Observable.range ():

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}
person ultraon    schedule 07.05.2018

в случае, когда вам нужно распечатать счетчик повторов, вы можете использовать пример, представленный на вики-странице Rxjava https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

observable.retryWhen(errors ->
    // Count and increment the number of errors.
    errors.map(error -> 1).scan((i, j) -> i + j)  
       .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
       // Limit the maximum number of retries.
       .takeWhile(errorCount -> errorCount < retryCounts)   
       // Signal resubscribe event after some delay.
       .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
person Angel Koh    schedule 27.04.2019

Используйте retryWhen

     /**
     * Retry Handler Support
     * @param errors
     * @param predicate filter error 
     * @param maxTry
     * @param periodStrategy
     * @param timeUnit
     * @return 
     */
    private  Flowable<?> retrySupport(Flowable<Throwable> errors, Predicate<? super Throwable> predicate , Integer maxTry , Function<Long, Long> periodStrategy , TimeUnit timeUnit )
    {
        LongAdder errorCount = new LongAdder();
        return errors
                .doOnNext(e -> {
                    errorCount.increment();
                    long currentCount = errorCount.longValue();
                    boolean tryContinue = predicate.test(e) && currentCount < maxTry;
                    Logger.i("No. of errors: %d , %s",  currentCount,
                            tryContinue ? String.format("please wait %d %s.", periodStrategy.apply(currentCount), timeUnit.name()) : "skip and throw");
                    if(!tryContinue)
                        throw  e;
                } )
                .flatMapSingle(e -> Single.timer( periodStrategy.apply(errorCount.longValue()), timeUnit));
    }

Образец

    private Single<DeviceInfo> getErrorToken( String device)
    {
        return Single.error(  new IOException( "network is disconnect!" ) );
    }

//only retry when emit IOExpcetion
//delay 1s,2s,4s,8s,16s

this.getErrorToken( this.deviceCode )
     .retryWhen( error -> retrySupport( error, 
                 e-> e instanceof IOException,
                 5 , 
                 count-> (long)Math.pow(2,count-1),TimeUnit.SECONDS ) )
     .subscribe( deviceInfo1 -> Logger.i( "----Get Device Info---" ) ,
                 e -> Logger.e( e, "On Error" ) ,
                 () -> Logger.i("<<<<<no more>>>>>"));

person soapgu    schedule 20.04.2021