Я хочу создать сервис, который объединяет результаты из двух реактивных источников. Один производит Mono, а другой - Flux. Для слияния мне нужно одно и то же значение моно для каждого испускаемого потока.
А пока у меня есть что-то вроде этого
Flux.zip(
service1.getConfig(), //produces flux
service2.getContext() //produces mono
.cache().repeat()
)
Это дает мне то, что мне нужно,
- service2 вызывается только один раз
- контекст предоставляется для каждой конфигурации
- результирующий флюс имеет столько же элементов, сколько и конфигураций
Но я заметил, что repeat () генерирует огромное количество элементов после кеширования контекста. Это проблема?
Могу ли я что-то сделать, чтобы ограничить количество повторов количеством полученных конфигураций, но все же выполнять оба запроса одновременно? Или это не проблема, и я могу спокойно игнорировать эти дополнительные испускаемые элементы?
Я пытался использовать combineLatest
, но в зависимости от времени некоторые элементы конфигурации могут теряться и не обрабатываться.
ИЗМЕНИТЬ
Глядя на предложения @Ricard Kollcaku, я создал образец теста, который показывает, почему это не то, что я ищу.
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
public class SampleTest
{
Logger LOG = LoggerFactory.getLogger(SampleTest.class);
AtomicLong counter = new AtomicLong(0);
Flux<String> getFlux()
{
return Flux.fromStream(() -> {
LOG.info("flux started");
sleep(1000);
return Stream.of("a", "b", "c");
}).subscribeOn(Schedulers.parallel());
}
Mono<String> getMono()
{
return Mono.defer(() -> {
counter.incrementAndGet();
LOG.info("mono started");
sleep(1000);
return Mono.just("mono");
}).subscribeOn(Schedulers.parallel());
}
private void sleep(final long milis)
{
try
{
Thread.sleep(milis);
}
catch (final InterruptedException e)
{
e.printStackTrace();
}
}
@Test
void test0()
{
final Flux<String> result = Flux.zip(
getFlux(),
getMono().cache().repeat()
.doOnNext(n -> LOG.warn("signal on mono", n)),
(s1, s2) -> s1 + " " + s2
);
assertResults(result);
}
@Test
void test1()
{
final Flux<String> result =
getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(),
(s1, s2) -> s1 + " " + s2));
assertResults(result);
}
@Test
void test2()
{
final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1)));
assertResults(result);
}
void assertResults(final Flux<String> result)
{
final Flux<String> flux = result;
StepVerifier.create(flux)
.expectNext("a mono")
.expectNext("b mono")
.expectNext("c mono")
.verifyComplete();
Assertions.assertEquals(1L, counter.get());
}
Просмотр результатов тестов для test1 и test2
2020-01-20 12:55:22.542 INFO [] [] [ parallel-3] SampleTest : flux started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-4] SampleTest : mono started
2020-01-20 12:55:24.547 INFO [] [] [ parallel-5] SampleTest : mono started
2020-01-20 12:55:24.548 INFO [] [] [ parallel-6] SampleTest : mono started
expected: <1> but was: <3>
Мне нужно отклонить ваше предложение. В обоих случаях getMono - вызывается столько раз, сколько элементов в потоке - вызывается после прибытия первого элемента потока. И это взаимодействия, которых я хочу избежать. Мои службы делают HTTP-запросы под капотом, и это может занять много времени.
В моем текущем решении этой проблемы нет, но если я добавлю регистратор в свой zip-архив, я получу это
2020-01-20 12:55:20.505 INFO [] [] [ parallel-1] SampleTest : flux started
2020-01-20 12:55:20.508 INFO [] [] [ parallel-2] SampleTest : mono started
2020-01-20 12:55:21.523 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.528 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.529 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.530 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.531 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.532 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.533 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.534 WARN [] [] [ parallel-2] SampleTest : signal on mono
2020-01-20 12:55:21.535 WARN [] [] [ parallel-2] SampleTest : signal on mono
Как видите, существует множество элементов, генерируемых объединением cache().repeat()
вместе, и я хочу знать, является ли это проблемой, и если да, то как ее избежать (но сохраняйте однократный вызов моно и параллельного вызова).
Mono.just(1).flux().flatMap(v1 -> Flux.just(2, 3).map(v2 -> v1 + v2));
- person efan   schedule 17.01.2020Flux.just(2, 3)
только тогда, когдаMono
испускает значение. И я хотел бы запустить оба запроса одновременно и объединить результаты, когда они будут доступны. @RicardKollcaku - не совсем так. Я могу обрабатывать1,a 2,a 3,a ...
, но на пути к этому я получаю абсурдное количествоa
из-за того, как работаетcache().repeat()
. - person Robert Ozga   schedule 20.01.2020