RxJava: как выразить doOnFirst()?

Я использую RxJava, и у меня есть Observable с несколькими элементами внутри. Что я хотел бы сделать, так это запустить функцию A для первого элемента, функцию B для всех из них и функцию C, когда Observable завершено:

-----1-----2-----3-----|-->
     |     |     |     |
     run A |     |     |
     |     |     |     |
     run B run B run B |
                       |
                       run C

есть ли умный способ выразить это с помощью лямбда-функций? У меня уже есть следующее решение, но оно выглядит некрасиво, и я подозреваю, что есть лучший способ сделать это:

observable.subscribe(
        new Action1<Item>() {
            boolean first = true;

            @Override
            public void call(Item item) {
                if (first) {
                    runA(item);
                    first = false;
                }
                runB(fax1);
            }
        },
        throwable -> {},
        () -> runC());

person TBieniek    schedule 02.09.2015    source источник


Ответы (3)


Используйте Observable.defer для инкапсуляции состояния подписки (это логическое значение, указывающее, находимся ли мы в первой записи).

Вот исполняемый класс, который используется в демонстрациях:

import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Observable<Integer> o = 
            Observable.just(1, 2, 3)
                .compose(doOnFirst(System.out::println);
        // will print 1
        o.subscribe();
        // will print 1
        o.subscribe();
    }

    public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
        return o -> Observable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return o.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    action.call(t);
                }
            });
        });
    }

}

Несмотря на то, что OP спрашивал о RxJava1, здесь то же решение, что и выше, но для RxJava2:

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;

public class DoOnFirstMain {

    public static void main(String[] args) {

        Flowable<Integer> f =
                Flowable.just(1, 2, 3)
                        .compose(doOnFirst(System.out::println);
        // will print 1
        f.subscribe();
        // will print 1
        f.subscribe();
    }

    public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
        return f -> Flowable.defer(() -> {
            final AtomicBoolean first = new AtomicBoolean(true);
            return f.doOnNext(t -> {
                if (first.compareAndSet(true, false)) {
                    consumer.accept(t);
                }
            });
        });
    }
}
person Dave Moten    schedule 03.09.2015
comment
@ tir38 Я не против добавления примера RxJava2, но эквивалентом Observable в RxJava2 является Flowable. Вы хотите внести поправку? - person Dave Moten; 07.02.2018

Думаю, я нашел простое решение для себя:

Observable<Integer> observable = Observable.just(1, 2, 3).share();
observable.take(1).subscribe(this::runA);
observable.subscribe(
    this::runB,
    throwable -> {},
    this::runC);

Это работает в однопоточном режиме и, кажется, работает в многопоточном, но я должен признать, что пока не слишком уверен в этом.

person TBieniek    schedule 03.09.2015
comment
Если ваш исходный наблюдаемый объект не имеет побочных эффектов подписки, это будет работать нормально. - person kjones; 05.09.2015
comment
@TBienek Именно так я бы это реализовал. Также в этом посте кажутся слишком сложные ответы. - person tmj; 05.09.2015
comment
Можете ли вы гарантировать, что runA будет выполняться до runB для 1? - person tir38; 25.08.2017
comment
Помещение второго .subscribe() внутри обработчика первого должно гарантировать, что runA произойдет первым. - person Robert Lewis; 21.08.2018

Уже встроены doOnNext и doOnTerminate (или аналогичные), поэтому кажется, что все, чего не хватает, — это выполнить действие только с первым элементом. Вот один из способов. Вы можете опубликовать свой поток, тогда в одном блоке поток идет как обычно, а в отдельной подписке мы слушаем только первое событие (с first) и выполняем действие при его получении. Вот пример:

observable.publish(new Func1<Observable<Item>, Observable<Item>>() {
    @Override
    public Observable<Item> call(Observable<Item> itemObservable) {
        itemObservable.first().subscribe((Item item) -> runA(item));
        return itemObservable;
    }
}).subscribe(/* some actions and subscription as usual ... */);

Если это выглядит слишком многословно, вы можете поместить это в утилиту Transformer и сохранить часть синтаксиса компоновщика. например.:

public static class Transformers<T> implements Observable.Transformer<T, T> {
    private final Action1<T> action1;

    private Transformers(final Action1<T> action1) {
        this.action1 = action1;
    }
    // cover for generics
    public static <T> Observable.Transformer<T, T> doOnFirst(final Action1<T> action1) {
        return new Transformers<T>(action1);
    }

    @Override
    public Observable<T> call(final Observable<T> observable) {
        return observable.publish(new Func1<Observable<T>, Observable<T>>() {
            @Override
            public Observable<T> call(Observable<T> observable) {
                observable.first().subscribe(action1);
                return observable;
            }
        });
    }
}

Затем вы можете назвать это так:

observable
    .compose(Transformers.doOnFirst((Item item) -> runA(item)))
    .subscribe(/* chain and subscribe as usual... */);

Очевидно, что все это выглядит намного лучше с синтаксисом Lambda, выше я предполагаю, как это будет выглядеть.

person Tim Meehan    schedule 02.09.2015
comment
использование публикации - очень тяжелое решение для этого. Попробую поставить что-нибудь попроще. - person Dave Moten; 03.09.2015
comment
@DaveMoten, не могли бы вы уточнить, почему вы считаете это слишком тяжелым? Вы имеете в виду накладные расходы на обработку, которые добавляет publish, или с точки зрения синтаксиса? - person Tim Meehan; 08.09.2015
comment
с точки зрения затрат на обработку. Мое решение также могло бы быть немного более эффективным, если бы оно было написано как оператор, чтобы сохранять изменчивое чтение при каждой эмиссии. - person Dave Moten; 09.09.2015