Я разрабатываю инструмент, который собирает данные из нескольких источников и последовательно применяет несколько преобразований. В настоящее время я конвертирую эту функциональность из потоков Java 8 для использования ReactiveX/RxJava.
Ниже вы можете увидеть модульный тест, демонстрирующий текущую реализацию RxJava.
Пока это работает, я недостаточно доволен результатом и ищу рекомендации, как его улучшить!
У меня два вопроса:
<сильный>1. Каждый источник возвращает список результатов (List‹List‹String>>). Поскольку преобразования необходимо выполнять для всего набора данных, мне нужно объединить несколько списков в один.
Прямо сейчас код выглядит так:
Observable<List<List<String>>> stage = Observable.merge(src1, src2, src3, src4);
final List<List<String>> collector = new ArrayList<>();
Single<List<List<String>>> combinedData = stage.reduce(collector, (list, items) -> {
list.addAll(items);
return list;
});
Есть ли способ избавиться от List<List<String>> collector
, который живет вне наблюдаемого потока?
2. Чтобы применить преобразования по порядку, я использую цикл for; Я пробовал несколько вариантов (например, flatMap, zipWith), однако в конечном итоге преобразования применяются не по порядку; как я могу смоделировать это без цикла for?
for (Transform t : transforms) {
stage = stage.flatMap(t::applyAsync);
}
По сути, мне нужен способ применить Observable<List<List<String>>> applyAsync(List<List<String>> input)
к входу List<List<String>>
и рекурсивно продолжать делать это при каждом преобразовании (Observable<List<List<String>>>
).
Это похоже на Observable.reduce
, но функция аккумулятора должна меняться на каждой итерации.
Вот полный код модульного теста, который я написал:
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.*;
public class ObservableTest {
@Test
public void testObservable() throws Exception {
// ARRANGE
final CountDownLatch DONE = new CountDownLatch(1);
// init source objects
Observable<List<List<String>>> src1 = Observable.just(makeMatrix(Arrays.asList("src11")));
Observable<List<List<String>>> src2 = Observable.just(makeMatrix(Arrays.asList("src21", "src22")));
Observable<List<List<String>>> src3 = Observable.just(makeMatrix(Arrays.asList("src31", "src32", "src33")));
Observable<List<List<String>>> src4 = Observable.just(makeMatrix(Arrays.asList("src41"), Arrays.asList("src51")));
// prepare transformations and processor
List<Transform> transforms = Arrays.asList(new Transform(1, 100), new Transform(2, 0));
Processor processor = spy(new Processor());
// ACT
// Concat sources
Observable<List<List<String>>> stage = Observable.merge(src1, src2, src3, src4);
// Merge individual into matrix
// (#1) Can the reduce operation be written without the accumulator?
final List<List<String>> collector = new ArrayList<>();
Single<List<List<String>>> combinedData = stage.reduce(collector, (list, items) -> {
list.addAll(items);
return list;
});
// Transform
stage = combinedData.toObservable();
for (Transform t : transforms) {
// (#2) Can a series of transforms be applied sequentially to a Single (List<List<String>>), without the use of a for-loop?
stage = stage.flatMap(t::applyAsync);
}
// Process
stage.doOnComplete(DONE::countDown)
.subscribeOn(Schedulers.computation())
.subscribe(o -> System.out.println(processor.printList(o)));
// wait for processing to complete
DONE.await();
// ASSERT
// The sources should be combined in a single matrix
@SuppressWarnings("unchecked")
ArgumentCaptor<List<List<String>>> resultCaptor = ArgumentCaptor.forClass(List.class);
verify(processor, times(1)).printList(resultCaptor.capture());
List<List<String>> resultMatrix = resultCaptor.getValue();
// result matrix should not be null and all transformations should be applied in order (T1, T2, etc.)
assertThat(resultMatrix, notNullValue());
assertThat(resultMatrix.stream().flatMap(Collection::stream).collect(Collectors.toList()), everyItem(containsString("T1-T2")));
assertThat(resultMatrix, not(hasItem(hasItem(containsString("T2-T1")))));
}
private List<List<String>> makeMatrix(List<String> items) {
return Collections.singletonList(items);
}
private List<List<String>> makeMatrix(List<String> items, List<String> moreItems) {
return Arrays.asList(items, moreItems);
}
static class Processor {
String printList(List<List<String>> input) {
return input.stream().map(rows -> rows.stream().collect(Collectors.joining(" | ")))
.collect(Collectors.joining(System.lineSeparator()));
}
}
static class Transform {
final int n;
private final int delay;
Transform(int n, int delay) {
this.n = n;
this.delay = delay;
}
private Observable<List<List<String>>> applyAsync(List<List<String>> input) {
return Observable.just(input).map(this::apply).delay(delay, TimeUnit.MILLISECONDS);
}
private List<List<String>> apply(List<List<String>> input) {
return input.stream()
.map(row -> row.stream()
.map(this::transform)
.collect(Collectors.toList())
)
.collect(Collectors.toList());
}
private String transform(String input) {
return input + "-T" + n;
}
}
}
Импортируйте следующие зависимости Maven, если вы хотите его запустить:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.4.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.10</version>
<scope>test</scope>
</dependency>
Pair
, а затем следующий оператор может быть картой, и вы можете просто сопоставить его с фактическим излучением, которое вы хотите (а неFunction
) - person Andrew Gallasch   schedule 18.08.2018