Встроенный или рекомендуемый способ повторить цепочку асинхронных операций в RxJava.

У меня есть график взаимозависимых асинхронных операций, смоделированный в RxJava. Для некоторых ошибок весь график должен быть перезапущен. Операторы retry(..) не поддерживают это напрямую, потому что любая ошибка предоставляется всем подписчикам. Поскольку операторы retry(..) просто переподписываются, они всегда получают ошибку, вычисленную только один раз, из конечной наблюдаемой. т. е. работа не выполняется снова при повторной подписке.

Я попытался создать специальный наблюдаемый объект, который вызывает метод создания наблюдаемых объектов для каждой подписки. В этом случае операторы повтора работают в основном так, как нужно, и после некоторой дополнительной операции с кэшем работают именно так, как нужно.

Однако это кажется настолько распространенным, что я подозреваю, что повторяю работу, уже предоставленную где-то в RxJava. Меня также беспокоит надежность моего решения, учитывая, что я пытаюсь сделать что-то на низком уровне, возможно, без достаточных знаний RxJava для этого. Другая проблема — компонуемость: для поддержки всех трех форм retry(..) мне понадобились бы три версии метода-оболочки.

Демонстрация ниже объясняет, что я пытаюсь сделать, и успех до сих пор.

Есть ли более простой или более идиоматический (или оба) способ сделать такую ​​​​повторную попытку в RxJava?

package demo;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;
import rx.util.async.Async;

/**
 ** <p>
 * Demonstrate attempts to get RxJava retry for asynchronous work chain. The use
 * case that exposed this problem is reading and writing data with versioning
 * for optimistic concurrency. The work is a series of async I/O operations that
 * must be re-assembled from scratch if a stale version is detected on write.
 * </p>
 *
 * <p>
 * Four cases are demonstrated in this class:
 * </p>
 * <ul>
 * <li>Case 1: perform the work and naiively apply a retry operator to the
 * asynchronous work. This fails because the work itself is not retried on
 * re-subscribe.</li>
 * <li>Case 2: wrap the work in an observer that performs it on every
 * subscription. A retry operator applied to the wrapper correctly re-attempts
 * the work on failure. However, every subsequent subscriber to the result
 * causes the work to be performed again.</li>
 * <li>Case 3: Apply the cache operator to the result of the retry operator.
 * This performs as desired.</li>
 * <li>Case 4: Generalize the approach of case 3 and encapsulate it in an
 * observable generator method. This shows that it is difficult to generalize
 * this behavior because each retry operator form (number, predicate, perpetual)
 * will require its own generator method.</li>
 * </ul>
 *
 * <p>
 * NOTE: this code does not work if compiled by the Eclipse (Keppler) compiler
 * for Java 8. I have to compile with javac for it to work. There is some
 * problem with Lambda class naming in the code generated by Eclipse.
 * </p>
 *
 *
 */
public class AsyncRetryDemo {

    public static void main(final String[] args) throws Exception {

        new AsyncRetryDemo().case1();
        new AsyncRetryDemo().case2();
        new AsyncRetryDemo().case3();
        new AsyncRetryDemo().case4();

        // output is:
        //
        // case 1, sub 1: fail (max retries, called=1)
        // case 1, sub 2: fail (max retries, called=1)
        // case 2, sub 1: pass (called=2)
        // case 2, sub 2: fail (called=3)
        // case 3, sub 1: pass (called=2)
        // case 3, sub 2: pass (called=2)
        // case 4, sub 1: pass (called=2)
        // case 4, sub 2: pass (called=2)

    }

    private final AtomicInteger called = new AtomicInteger();

    private final CountDownLatch done = new CountDownLatch(2);

    /**
     * This represents a sequence of interdependent asynchronous operations that
     * might fail in a way that prescribes a retry (but in this case, all we are
     * doing is squaring an integer asynchronously and failing the first time).
     *
     * @param input
     *            to the process.
     *
     * @return promise to perform the work and produce either a result or a
     *         suggestion to retry (e.g. a stale version error).
     */
    private Observable<Integer> canBeRetried(final int a) {

        final Observable<Integer> rval;
        if (this.called.getAndIncrement() == 0) {
            rval = Observable.error(new RuntimeException(
                    "we always fail the first time"));
        } else {
            rval = Async.start(() -> a * a);
        }

        return rval;

    }

    private void case1() throws InterruptedException {

        /*
         * In this case, we invoke the observable-creator to get the async
         * promise. Of course, if it fails, any retry will fail as well because
         * the failed result is computed one time and pushed to all subscribers
         * forever.
         *
         * Thus this case fails because the first invocation of canBeRetried(..)
         * always fails.
         */
        final Observable<Integer> o = canBeRetried(2)

                .retry(2);

        check("case 1", o);

        this.done.await();

    }

    private void case2() throws InterruptedException {

        /*
         * In this case, we wrap canBeRetried(..) inside an observer that
         * invokes it on every subscription. So, we get past the retry problem.
         * But every new subscriber after the retry succeeds causes the work to
         * restart.
         */
        final Observable<Integer> o = Observable.create(
                new OnSubscribe<Integer>() {

                    @Override
                    public void call(final Subscriber<? super Integer> child) {
                        canBeRetried(2).subscribe(child);
                    }
                })

                .retry(2);

        check("case 2", o);

        this.done.await();

    }

    private void case3() throws InterruptedException {

        /*
         * In this case, we wrap canBeRetried(..) inside an observer that
         * invokes it on every subscription. So, we get past the retry problem.
         * We cache the result of the retry to solve the extra work problem.
         */
        final Observable<Integer> o = Observable.create(
                new OnSubscribe<Integer>() {

                    @Override
                    public void call(final Subscriber<? super Integer> child) {
                        canBeRetried(2).subscribe(child);
                    }
                })
                .retry(2)

                .cache();

        check("case 3", o);

        this.done.await();

    }

    private void case4() throws InterruptedException {

        /*
         * Same as case 3 but we use the retryAndCache(..) to do the work for
         * us.
         */
        final Observable<Integer> o = retryAndCache(() -> canBeRetried(2), 2);

        check("case 4", o);

        this.done.await();

    }

    private void check(final String label, final Observable<Integer> promise) {

        // does the work get retried on failure?
        promise.subscribe(
                v -> {
                    System.out.println(label + ", sub 1: "
                            + (this.called.get() == 2 ? "pass" : "fail")
                            + " (called=" + this.called.get() + ")");
                },
                x -> {
                    System.out.println(label
                            + ", sub 1: fail (max retries, called="
                            + this.called.get() + ")");
                    this.done.countDown();
                }, () -> {
                    this.done.countDown();
                });

        // do subsequent subscribers avoid invoking the work again?
        promise.subscribe(
                v -> {
                    System.out.println(label + ", sub 2: "
                            + (this.called.get() == 2 ? "pass" : "fail")
                            + " (called=" + this.called.get() + ")");
                },
                x -> {
                    System.out.println(label
                            + ", sub 2: fail (max retries, called="
                            + this.called.get() + ")");
                    this.done.countDown();
                }, () -> {
                    this.done.countDown();
                });

    }

    /**
     * Generalized retry and cache for case 4.
     *
     * @param binder
     *            user-provided supplier that assembles and starts the
     *            asynchronous work.
     *
     * @param retries
     *            number of times to retry on error.
     *
     * @return promise to perform the work and retry up to retry times on error.
     */
    private static <R> Observable<R> retryAndCache(
            final Func0<Observable<R>> binder, final int retries) {

        return Observable.create(new OnSubscribe<R>() {

            @Override
            public void call(final Subscriber<? super R> child) {
                binder.call().subscribe(child);
            }
        })

        .retry(retries)

        .cache();
    }

}

person aztecrex    schedule 14.07.2014    source источник
comment
Я немного не понимаю вашу постановку проблемы. Вы хотите повторить всю работу с различными наблюдаемыми при ошибке или только часть работы?   -  person lopar    schedule 27.01.2015
comment
В этом случае #canBeRetried(..) инкапсулирует произвольную работу, которую необходимо перезапустить с нуля при повторной попытке. Эта работа могла бы быть частью более крупной композиции. Причиной этой проблемы было оптимистичное обновление базы данных, где работа заключается в следующем: чтение -> вычисление изменений -> запись. Если запись не удалась из-за устаревшей версии, повторная попытка должна начаться снова с чтения.   -  person aztecrex    schedule 10.03.2015


Ответы (1)


На самом деле у вас есть несколько вариантов сделать это лучше.

Первый вариант — использовать defer вместо create :

private void case5() throws InterruptedException {
    // Same as case 3 but using defer
    final Observable<Integer> o = Observable.defer(() -> canBeRetried(2)).retry(2).cache();

    check("case 5", o);

    this.done.await();
}

Однако настоящая проблема связана с методом canBeRetired; он должен вызываться при каждой повторной попытке. Лучший подход — создать Observable, который повторно выполняет логику для каждой подписки. Метод может выглядеть так:

 private Observable<Integer> canBeRetriedBetter(final int a) {
    return Observable.defer(() -> canBeRetried(a));
}

И код:

private void case6() throws InterruptedException {

    final Observable<Integer> o = canBeRetriedBetter(2).retry(2).cache();

    check("case 6", o);

    this.done.await();
}

Дальнейшие улучшения можно получить, используя compose и пользовательское преобразование. Используя их, вы можете применить набор часто используемых операторов к любой цепочке согласованным, многоразовым образом.

Например, мы можем определить оператор, который вызывает кэш и повторяет попытку в потоке:

   public static class RetryAndCache<T> implements Observable.Transformer<T, T>{
    private final int count;
    public RetryAndCache(int count) {
        this.count = count;
    }

    @Override
    public Observable<T> call(Observable<T> o) {
        return o.retry(count).cache();
    }
}

И, наконец, код:

private void case7() throws InterruptedException {

    final Observable<Integer> o = canBeRetriedBetter(2).compose(new RetryAndCache(2));

    check("case 7", o);

    this.done.await();
}
person Marek Hawrylczak    schedule 25.03.2015
comment
Ницца. Особенно нравится ваше понимание, приводящее к отсрочке с поставщиком. - person aztecrex; 27.04.2015