В поисках лучших способов компоновки RxJava Observables

Я разрабатываю инструмент, который собирает данные из нескольких источников и последовательно применяет несколько преобразований. В настоящее время я конвертирую эту функциональность из потоков Java 8 для использования ReactiveX/RxJava.

Ниже вы можете увидеть модульный тест, демонстрирующий текущую реализацию RxJava.

Пока это работает, я недостаточно доволен результатом и ищу рекомендации, как его улучшить!


У меня два вопроса:

<сильный>1. Каждый источник возвращает список результатов (List‹List‹String>>). Поскольку преобразования необходимо выполнять для всего набора данных, мне нужно объединить несколько списков в один.

Прямо сейчас код выглядит так:

Observable<List<List<String>>> stage = Observable.merge(src1, src2, src3, src4);

final List<List<String>> collector = new ArrayList<>();
Single<List<List<String>>> combinedData = stage.reduce(collector, (list, items) -> {
    list.addAll(items);
    return list;
});

Есть ли способ избавиться от List<List<String>> collector, который живет вне наблюдаемого потока?


2. Чтобы применить преобразования по порядку, я использую цикл for; Я пробовал несколько вариантов (например, flatMap, zipWith), однако в конечном итоге преобразования применяются не по порядку; как я могу смоделировать это без цикла for?

for (Transform t : transforms) {
    stage = stage.flatMap(t::applyAsync);
}

По сути, мне нужен способ применить Observable<List<List<String>>> applyAsync(List<List<String>> input) к входу List<List<String>> и рекурсивно продолжать делать это при каждом преобразовании (Observable<List<List<String>>>).

Это похоже на Observable.reduce, но функция аккумулятора должна меняться на каждой итерации.


Вот полный код модульного теста, который я написал:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*;

public class ObservableTest {
    @Test
    public void testObservable() throws Exception {
        // ARRANGE
        final CountDownLatch DONE = new CountDownLatch(1);

        // init source objects
        Observable<List<List<String>>> src1 = Observable.just(makeMatrix(Arrays.asList("src11")));
        Observable<List<List<String>>> src2 = Observable.just(makeMatrix(Arrays.asList("src21", "src22")));
        Observable<List<List<String>>> src3 = Observable.just(makeMatrix(Arrays.asList("src31", "src32", "src33")));
        Observable<List<List<String>>> src4 = Observable.just(makeMatrix(Arrays.asList("src41"), Arrays.asList("src51")));

        // prepare transformations and processor
        List<Transform> transforms = Arrays.asList(new Transform(1, 100), new Transform(2, 0));
        Processor processor = spy(new Processor());


        // ACT

        // Concat sources
        Observable<List<List<String>>> stage = Observable.merge(src1, src2, src3, src4);

        // Merge individual into matrix

        // (#1) Can the reduce operation be written without the accumulator?
        final List<List<String>> collector = new ArrayList<>();
        Single<List<List<String>>> combinedData = stage.reduce(collector, (list, items) -> {
            list.addAll(items);
            return list;
        });


        // Transform
        stage = combinedData.toObservable();
        for (Transform t : transforms) {
            // (#2) Can a series of transforms be applied sequentially to a Single (List<List<String>>), without the use of a for-loop?
            stage = stage.flatMap(t::applyAsync);
        }

        // Process
        stage.doOnComplete(DONE::countDown)
                .subscribeOn(Schedulers.computation())
                .subscribe(o -> System.out.println(processor.printList(o)));

        // wait for processing to complete
        DONE.await();


        // ASSERT

        // The sources should be combined in a single matrix
        @SuppressWarnings("unchecked")
        ArgumentCaptor<List<List<String>>> resultCaptor = ArgumentCaptor.forClass(List.class);

        verify(processor, times(1)).printList(resultCaptor.capture());
        List<List<String>> resultMatrix = resultCaptor.getValue();

        // result matrix should not be null and all transformations should be applied in order (T1, T2, etc.)
        assertThat(resultMatrix, notNullValue());
        assertThat(resultMatrix.stream().flatMap(Collection::stream).collect(Collectors.toList()), everyItem(containsString("T1-T2")));
        assertThat(resultMatrix, not(hasItem(hasItem(containsString("T2-T1")))));
   }


    private List<List<String>> makeMatrix(List<String> items) {
        return Collections.singletonList(items);
    }

    private List<List<String>> makeMatrix(List<String> items, List<String> moreItems) {
        return Arrays.asList(items, moreItems);
    }

    static class Processor {
        String printList(List<List<String>> input) {
            return input.stream().map(rows -> rows.stream().collect(Collectors.joining(" | ")))
                    .collect(Collectors.joining(System.lineSeparator()));
        }
    }

    static class Transform {
        final int n;
        private final int delay;

        Transform(int n, int delay) {
            this.n = n;
            this.delay = delay;
        }

        private Observable<List<List<String>>> applyAsync(List<List<String>> input) {
            return Observable.just(input).map(this::apply).delay(delay, TimeUnit.MILLISECONDS);
        }

        private List<List<String>> apply(List<List<String>> input) {
            return input.stream()
                    .map(row -> row.stream()
                            .map(this::transform)
                            .collect(Collectors.toList())
                    )
                    .collect(Collectors.toList());
        }

        private String transform(String input) {
            return input + "-T" + n;
        }
    }
}

Импортируйте следующие зависимости Maven, если вы хотите его запустить:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.mockito</groupId>
    <artifactId>mockito-core</artifactId>
    <version>2.4.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testng</groupId>
    <artifactId>testng</artifactId>
    <version>6.10</version>
    <scope>test</scope>
</dependency>

person Mihai Bojin    schedule 20.06.2017    source источник
comment
flatMap не (обязательно) сохраняет порядок, если используется с параллелизмом.   -  person Andrew Gallasch    schedule 18.08.2018
comment
Если вам нужно, чтобы ваша функция изменилась, почему бы просто не заставить функцию-аккумулятор вызывать какую-то функцию, которую вы возвращаете. Если вам нужно вернуть функцию и вы хотите, чтобы было испущено что-то еще, вы можете испустить Pair, а затем следующий оператор может быть картой, и вы можете просто сопоставить его с фактическим излучением, которое вы хотите (а не Function)   -  person Andrew Gallasch    schedule 18.08.2018


Ответы (1)


ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: у меня нет опыта работы с RxJava, только RxJS, Rx.NET и RxSwift.

1

Вы должны иметь возможность передать новый экземпляр ArrayList напрямую:

Single<List<List<String>>> combinedData = stage.reduce(new ArrayList<>(), (list, items) -> {
    list.addAll(items);
    return list;
});

Он будет использоваться только как начальное семя из аккумулятора; после первого обращения к лямбде это не актуально.

2

Я думаю, вам нужна рекурсия для этой проблемы. Вам может подойти следующее решение:

// Put this somewhere
public IObservable<List<List<String>> handleTransforms(
    Observable<List<List<String>>> currentStage
    List<Transform> ts){

    return currentStage.flatMap(t[0]::applyAsync)
        .flatMap(newStage -> handleTransforms(newStage, ts.stream().skip(1).toList()))
}

// And then use it like this
stage = handleTransforms(stage, transforms);
person Jon G Stødle    schedule 19.07.2017