Комбинируйте моно с флюсом

Я хочу создать сервис, который объединяет результаты из двух реактивных источников. Один производит Mono, а другой - Flux. Для слияния мне нужно одно и то же значение моно для каждого испускаемого потока.

А пока у меня есть что-то вроде этого

Flux.zip(
   service1.getConfig(), //produces flux
   service2.getContext() //produces mono
           .cache().repeat()
)

Это дает мне то, что мне нужно,

  • service2 вызывается только один раз
  • контекст предоставляется для каждой конфигурации
  • результирующий флюс имеет столько же элементов, сколько и конфигураций

Но я заметил, что repeat () генерирует огромное количество элементов после кеширования контекста. Это проблема?

Могу ли я что-то сделать, чтобы ограничить количество повторов количеством полученных конфигураций, но все же выполнять оба запроса одновременно? Или это не проблема, и я могу спокойно игнорировать эти дополнительные испускаемые элементы?

Я пытался использовать combineLatest, но в зависимости от времени некоторые элементы конфигурации могут теряться и не обрабатываться.

ИЗМЕНИТЬ

Глядя на предложения @Ricard Kollcaku, я создал образец теста, который показывает, почему это не то, что я ищу.

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public class SampleTest
{
    Logger LOG = LoggerFactory.getLogger(SampleTest.class);
    AtomicLong counter = new AtomicLong(0);

    Flux<String> getFlux()
    {
        return Flux.fromStream(() -> {
            LOG.info("flux started");
            sleep(1000);
            return Stream.of("a", "b", "c");
        }).subscribeOn(Schedulers.parallel());
    }

    Mono<String> getMono()
    {
        return Mono.defer(() -> {
            counter.incrementAndGet();
            LOG.info("mono started");
            sleep(1000);
            return Mono.just("mono");
        }).subscribeOn(Schedulers.parallel());
    }

    private void sleep(final long milis)
    {
        try
        {
            Thread.sleep(milis);
        }
        catch (final InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    @Test
    void test0()
    {
        final Flux<String> result = Flux.zip(
                getFlux(),
                getMono().cache().repeat()
                         .doOnNext(n -> LOG.warn("signal on mono", n)),
                (s1, s2) -> s1 + " " + s2
        );

        assertResults(result);
    }

    @Test
    void test1()
    {
        final Flux<String> result =
                getFlux().flatMap(s -> Mono.zip(Mono.just(s), getMono(),
                        (s1, s2) -> s1 + " " + s2));
        assertResults(result);
    }

    @Test
    void test2()
    {
        final Flux<String> result = getFlux().flatMap(s -> getMono().map((s1 -> s + " " + s1)));
        assertResults(result);
    }

    void assertResults(final Flux<String> result)
    {
        final Flux<String> flux = result;

        StepVerifier.create(flux)
                    .expectNext("a mono")
                    .expectNext("b mono")
                    .expectNext("c mono")
                    .verifyComplete();

        Assertions.assertEquals(1L, counter.get());
    }

Просмотр результатов тестов для test1 и test2

2020-01-20 12:55:22.542 INFO  [] [] [     parallel-3]  SampleTest  : flux started  
2020-01-20 12:55:24.547 INFO  [] [] [     parallel-4]  SampleTest  : mono started  
2020-01-20 12:55:24.547 INFO  [] [] [     parallel-5]  SampleTest  : mono started  
2020-01-20 12:55:24.548 INFO  [] [] [     parallel-6]  SampleTest  : mono started  

expected: <1> but was: <3>

Мне нужно отклонить ваше предложение. В обоих случаях getMono - вызывается столько раз, сколько элементов в потоке - вызывается после прибытия первого элемента потока. И это взаимодействия, которых я хочу избежать. Мои службы делают HTTP-запросы под капотом, и это может занять много времени.

В моем текущем решении этой проблемы нет, но если я добавлю регистратор в свой zip-архив, я получу это

2020-01-20 12:55:20.505 INFO  [] [] [     parallel-1]  SampleTest  : flux started  
2020-01-20 12:55:20.508 INFO  [] [] [     parallel-2]  SampleTest  : mono started  
2020-01-20 12:55:21.523 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.528 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.529 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.530 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.531 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.532 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.533 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.534 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  
2020-01-20 12:55:21.535 WARN  [] [] [     parallel-2]  SampleTest  : signal on mono  

Как видите, существует множество элементов, генерируемых объединением cache().repeat() вместе, и я хочу знать, является ли это проблемой, и если да, то как ее избежать (но сохраняйте однократный вызов моно и параллельного вызова).


person Robert Ozga    schedule 17.01.2020    source источник
comment
Почему бы не использовать плоское отображение через Mono, например что-то вроде этого Mono.just(1).flux().flatMap(v1 -> Flux.just(2, 3).map(v2 -> v1 + v2));   -  person efan    schedule 17.01.2020
comment
Ваша проблема в этом? поток 1, испускающий 1,2,3,4 поток 2, испускающий только a. И вы можете обработать только 1, а, когда вы хотите обработать 1, а - 2, а - 3, а - 4, а?   -  person Ricard Kollcaku    schedule 17.01.2020
comment
@efan - это сработает, но (из моего понимания реакции) я буду делать запрос к Flux.just(2, 3) только тогда, когда Mono испускает значение. И я хотел бы запустить оба запроса одновременно и объединить результаты, когда они будут доступны. @RicardKollcaku - не совсем так. Я могу обрабатывать 1,a 2,a 3,a ..., но на пути к этому я получаю абсурдное количество a из-за того, как работает cache().repeat().   -  person Robert Ozga    schedule 20.01.2020
comment
@RobertOzga с вашим редактированием, я лучше объясню, почему вы не хотите, чтобы моно создавалось много раз. Я отредактировал ответ, поэтому Mono будет создан только 1 раз   -  person Ricard Kollcaku    schedule 20.01.2020


Ответы (3)


Я думаю, что то, что вы пытаетесь достичь, можно сделать с Flux.join

Вот пример кода:

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
        Mono.just(2).delayElement(Duration.ofMillis(500))).log();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();

List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
    return x + y;
}).collectList().block();

System.out.println(list);
person efan    schedule 20.01.2020

Такие библиотеки, как Project Reactor и RxJava, стараются предоставить как можно больше комбинаций своих возможностей, но не предоставляют доступа к инструментам объединения возможностей. И в результате всегда есть угловые случаи, которые не закрыты.

Моя собственная DF4J, насколько мне известно, единственная асинхронная библиотека, которая предоставляет средства для объединения возможностей. Например, вот как пользователь может заархивировать Flux и Mono: (конечно, этот класс не является частью самого DF4J):

import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

abstract class ZipActor<T1, T2> extends Actor {
    InpFlow<T1> inpFlow = new InpFlow<>(this);
    InpFlow<T2> inpScalar = new InpFlow<>(this);

    ZipActor(Flux<T1> flux, Mono<T2> mono) {
        flux.subscribe(inpFlow);
        mono.subscribe(inpScalar);
    }

    @Override
    protected void runAction() throws Throwable {
        if (inpFlow.isCompleted()) {
            stop();
            return;
        }
        T1 element1 = inpFlow.removeAndRequest();
        T2 element2 = inpScalar.current();
        runAction(element1, element2);
    }

    protected abstract void runAction(T1 element1, T2 element2);
}

и вот как это можно использовать:

@Test
public void ZipActorTest() {
    Flux<Integer> flux = Flux.just(1,2,3);
    Mono<Integer> mono = Mono.just(5);
    ZipActor<Integer, Integer> actor = new ZipActor<Integer, Integer>(flux, mono){
        @Override
        protected void runAction(Integer element1, Integer element2) {
            System.out.println("got:"+element1+" and:"+element2);
        }
    };
    actor.start();
    actor.join();
}

Вывод консоли следующий:

got:1 and:5
got:2 and:5
got:3 and:5
person Alexei Kaigorodov    schedule 18.01.2020
comment
Спасибо за предложение, это, вероятно, решит мою проблему. К сожалению, мне пришлось бы бороться с корпоративной политикой моей компании, чтобы иметь возможность включить вашу библиотеку в наш проект (например, вопросы поддержки / исправлений и т. Д.) :( Также включение внешней библиотеки для решения одной проблемы может быть не лучшим подходом, если есть другие (не такие хорошие, но все же) жизнеспособные решения. Тем не менее, спасибо за публикацию, я буду помнить вашу библиотеку для будущих вариантов использования :) - person Robert Ozga; 20.01.2020
comment
Я не уверен, что понимаю, что вы отсюда идете - реактор действительно предоставляет возможности для объединения потоков с помощью методов Flux.zip и т. Д. Для наглядности следует также отметить, что вы являетесь автором DF4J. - person Michael Berry; 20.01.2020

Вы можете сделать это с помощью простого изменения

    getFlux()
    .flatMap(s -> Mono.zip(Mono.just(s),getMono(), (s1, s2) -> s1+" "+s2))
    .subscribe(System.out::println);

Flux<String> getFlux(){
    return Flux.just("a","b","c");
}
Mono<String> getMono(){
    return  Mono.just("mono");
}

если вы хотите использовать zip, но вы можете добиться тех же результатов, используя плоскую карту

      getFlux()
            .flatMap(s -> getMono()
                    .map((s1 -> s + " " + s1)))
            .subscribe(System.out::println);
}

Flux<String> getFlux() {
    return Flux.just("a", "b", "c");
}

Mono<String> getMono() {
    return Mono.just("mono");
}

в обоих случаях результат: моно b моно c моно

ИЗМЕНИТЬ Хорошо, теперь я понимаю это лучше. Вы можете попробовать это решение.

   getMono().
            flatMapMany(s -> getFlux().map(s1 -> s1 + " " + s))
            .subscribe(System.out::println);


Flux<String> getFlux() {
    return Flux.defer(() -> {
        System.out.println("init flux");
        return Flux.just("a", "b", "c");
    });
}

Mono<String> getMono() {
    return Mono.defer(() -> {
        System.out.println("init Mono");
        return Mono.just("sss");
    });
}
person Ricard Kollcaku    schedule 20.01.2020
comment
Спасибо за ответ, но это не сработает. Я отредактировал свой вопрос, чтобы лучше объяснить мои ожидания и почему ваше предложение не соответствует моим потребностям. - person Robert Ozga; 20.01.2020
comment
Это работает лучше - одиночный вызов в моно. Но код все еще ждет, пока моно выдаст результат, прежде чем вызывать запрос потока. Я бы предпочел запускать оба одновременно (zip-логика с отменой других запросов, если один из них не работает, здесь тоже хорошо), но, похоже, это невозможно - person Robert Ozga; 20.01.2020