Репозиторий Reactor Spring mongodb объединяет несколько результатов вместе

Я как бы новичок в реактивном программировании и в настоящее время работаю над приложением на основе Spring webflux. Я застрял между несколькими вопросами.

public class FooServiceImpl {

@Autowired
private FooDao fooDao;

@Autowired
private AService aService;

@Autowired
private BService bService;

public long calculateSomething(long fooId) {
    Foo foo = fooDao.findById(fooId); // Blocking call one

    if (foo == null) {
        foo = new Foo();
    }

    Long bCount = bService.getCountBByFooId(fooId); // Blocking call two
    AEntity aEntity = aService.getAByFooId(fooId);  // Blocking call three

    // Do some calculation using foo, bCount and aEntity
    // ...
    // ...

    return someResult;
}
}

Таким образом мы пишем блокирующий код, который использует три результата вызова внешнего API (рассмотрим как вызовы БД). Я изо всех сил пытаюсь преобразовать это в реактивный код. Если все три станут моно, и если я подпишусь на все три, будет ли внешний подписчик заблокирован?

public Mono<Long> calculateSomething(long fooId) {
    return Mono.create(sink -> {
        Mono<Foo> monoFoo = fooDao.findById(fooId); // Reactive call one
        monoFoo.subscribe(foo -> {
            if (foo == null) {
                foo = new Foo();
            }

            Mono<Long> monoCount = bService.getCountBByFooId(fooId);  // Reactive call two

            monoCount.subscribe(aLong -> {
                Mono<AEntity> monoA = aService.getAByFooId(fooId);  // Reactive call three
                monoA.subscribe(aEntity -> {
                    //...
                    //...
                    sink.success(someResult);
                });
            });
        });
    };
  }

Я видел, что есть функция под названием zip, но она работает только с двумя результатами. Есть ли способ применить ее здесь?

Также что произойдет, если мы подпишемся на что-то внутри метода create. Будет ли он блокировать поток?

Был бы очень благодарен, если бы вы мне помогли.


person Yasitha Thilakaratne    schedule 10.03.2020    source источник
comment
вы не должны подписываться, подписчик - это вызывающий клиент, который инициировал вызов, ваша служба - издатель, клиент - подписчик. Если вы хотите получить значение и преобразовать его, вам следует вместо этого использовать flatMap, а если вы хотите выполнять блокирующие вызовы, вы должны поместить их в отдельный планировщик, как описано здесь projectreactor.io/docs/core/release/reference/   -  person Toerktumlare    schedule 10.03.2020
comment
каждый subscribe, который вы делаете, блокирует, поэтому никогда не подписывайтесь (почти, есть некоторые крайние случаи)   -  person Toerktumlare    schedule 10.03.2020
comment
пожалуйста, посмотрите учебник на YouTube или что-нибудь еще, чтобы понять, как написать базовое приложение webflux.   -  person Toerktumlare    schedule 10.03.2020
comment
Большое спасибо @ThomasAndolf. Я займусь этим.   -  person Yasitha Thilakaratne    schedule 11.03.2020


Ответы (1)


Если бы вы дали мне расчет, который вы хотите сделать с этими значениями, мне было бы легче показать способ, которым это делает реактор. Но давайте предположим, что вы хотите прочитать значение из базы данных, а затем использовать это значение для другого. Используйте плоские карты и создайте уникальный Flux, уменьшающий количество строк кода и сложность, без необходимости использовать subscribe (), как говорят другие люди. Пример:

return fooDao.findById(fooId)
.flatmap(foo -> bService.getCountBByFooId(foo))
.flatmap(bCount -> aService.getAByFooId(fooId).getCount()+bCount);
person Filly    schedule 11.03.2020
comment
Большое спасибо за то, что показали это. Итак, если мы хотим объединить несколько и преобразовать map() и flatmap(), правильный ли путь? - person Yasitha Thilakaratne; 16.03.2020
comment
Также, чтобы запустить событие обратного вызова в другом потоке, мы должны использовать Schedulers, верно? Это означает, что если мы не вызываем с Scheduler, вызывающий поток будет заблокирован в точке subscribe до тех пор, пока не будет вызван метод обратного вызова onNext, не так ли? Пожалуйста, поправьте меня, если я ошибаюсь. - person Yasitha Thilakaratne; 16.03.2020
comment
Прочитав больше, выяснилось, что Reactor предоставляет метод под названием publishOn для обработки в другом потоке для блокировки API. - person Yasitha Thilakaratne; 16.03.2020