Порядок последовательности RxJava2, вызываемый с помощью дополняемого оператора andThen

Я пытаюсь перейти с RxJava1 на RxJava2. Я заменяю все части кода, где раньше было Observable<Void>, на Compleatable. Однако я столкнулся с одной проблемой с порядком потоковых вызовов. Когда я раньше имел дело с Observables и использовал карты и плоские карты, код работал «как ожидалось». Однако оператор andthen(), похоже, работает немного иначе. Вот пример кода для упрощения самой проблемы.

public Single<String> getString() {
    Log.d("Starting flow..")
    return getCompletable().andThen(getSingle());
}

public Completable getCompletable() {
    Log.d("calling getCompletable");
    return Completable.create(e -> {
                Log.d("doing actuall completable work");
                e.onComplete();
            }
    );
}

public Single<String> getSingle() {
    Log.d("calling getSingle");
    if(conditionBasedOnActualCompletableWork) {
        return getSingleA();
    }else{
        return getSingleB();
    }
}

В конце концов я вижу в журналах:

  1-> Log.d("Starting flow..")
  2-> Log.d("calling getCompletable");
  3-> Log.d("calling getSingle");
  4-> Log.d("doing actuall completable work");

И, как вы, вероятно, догадались, я ожидал, что строка 4 будет вызываться перед строкой 3 (после этого имя оператора andthen() предполагает, что код будет вызываться «после того, как» Completable завершит свою работу). Раньше я создавал Observable<Void> с помощью оператора Async.toAsync(), и метод, который теперь называется getSingle, находился в потоке flatMap - он работал так, как я ожидал, поэтому журнал 4 появится раньше 3. Теперь я попытался изменить способ создания Compleatable - как при использовании fromAction или fromCallable, но ведет себя так же. Я также не смог найти другого оператора для замены andthen(). Чтобы подчеркнуть - метод должен быть Completable, поскольку он не имеет никакого значения для возврата - он изменяет настройки приложения и другие настройки (и используется таким образом глобально, в основном работая «как ожидалось»), и эти изменения потребуются позже. в потоке. Я также попытался обернуть метод getSingle(), чтобы каким-то образом создать Single и переместить оператор if внутри блока create, но я не знаю, как использовать там методы getSingleA / B (). И мне нужно их использовать, так как они имеют свою сложность и дублировать код не имеет смысла. Кто-нибудь знает, как изменить это в RxJava2, чтобы он вел себя так же? Есть несколько мест, где я полагаюсь на завершение задания Compleatable, прежде чем двигаться вперед с потоком (например, обновление токена сеанса, обновление базы данных, настроек и т. Д. - нет проблем в RxJava1 с использованием flatMap).


person Adam J    schedule 01.04.2017    source источник
comment
Вы понимаете, что вызов getSingle() выполняется немедленно, а не после завершения Completable, возвращенного getCompletable?   -  person akarnokd    schedule 01.04.2017


Ответы (1)


Вы можете использовать defer:

getCompletable().andThen(Single.defer(() -> getSingle()))

Таким образом, содержимое getSingle() выполняется не сразу, а только тогда, когда Completablecompletes и andThen переключаются на Single.

person akarnokd    schedule 01.04.2017
comment
Спасибо за подсказку, я пометил ответ как полезный - он решает проблему. Хотя я должен сказать, что это действительно похоже на временное решение. Я все еще очень озадачен, почему поведение изменилось по сравнению с версией RxJava1, где я использовал flatMap, а обратные вызовы были в том порядке, в котором я ожидал. Не нашел никакой информации об этом в разделе «Что нового» документации RxJava2. - person Adam J; 02.04.2017