Вы разрываете реактивную цепочку.
В реактивном программировании ничего не происходит, пока вы не подпишетесь.
Что это значит? Я могу показать это на небольшом примере.
// If running this, nothing happens
Mono.just("Foobar");
в то время как:
Mono.just("Foobar").subscribe(s -> System.out.println(s));
Напечатаем:
Foobar
Это также применимо, если у вас есть функция
public void getString() {
Mono.just("Foobar");
}
// Nothing happens, you have declared something
// but it will never get run, no one is subscribing
getString();
Что тебе необходимо сделать:
public Mono<String> getString() {
// This could be saving to a database or anything, this will now get run
return Mono.just("Now this code will get run");
}
// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));
Так что же происходит? Что ж, в реактивном программировании, как только кто-то подписывается на Mono или Flux, программа будет проходить вверх и строить цепочку обратных вызовов, пока не найдет производителя, который начинает генерировать значения (в моем случае это справедливый оператор). Этот этап называется этапом сборки. Когда эта фаза будет завершена, реактивная цепочка начнет производить ценности для всех, кто подписывается.
Если никто не подписывается, цепочка не будет построена.
Итак, кто подписчик? Обычно это конечный потребитель стоимости. Так, например, веб-страница, инициировавшая вызов, или мобильное приложение, но также может быть вашей службой Spring Boot, если она инициирует вызов (например, задание cron).
Итак, давайте посмотрим на ваш код:
@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
long startTime = System.currentTimeMillis();
// Here you declare a Mono but ignoring the return type so breaking the reactive chain
Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch() /* **Creating batch***/
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
//.execute())); /* **Execution batch***/
.execute())).as(StepVerifier::create)
.expectNextCount(3) /* **Expect count batch***/
.verifyComplete(); /* **Verify batch***/
// Here at the end you have no subscriber
LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
// Null is not allowed in reactive chains
return null;
}
Так как же решить эту проблему?
Ну нужно не разрывать реактивную цепочку. Это базовое реактивное программирование.
@Transactional
@GetMapping("/batchFetchData")
public Mono<Void> batchFetch() {
long startTime = System.currentTimeMillis();
// we return here so that the calling client
// can subscribe and start the chain
return Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch()
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
.execute()))
.then();
// then() statement throws away whatever the return
// value is and just signals to the calling client
// when everything is done.
}
Я не хочу ничего возвращать
Вот для чего нужен оператор Mono#then
. Вы видите, что когда каждая часть в цепочке завершена, она будет сигнализировать о ее завершении, а затем передавать значения из одной части в другую, а затем снова сигнализировать и передавать значение и т. Д. Когда мы дойдем до оператора then
, он просто подаст сигнал COMPLETE и ничего не возвращает (или фактически возвращает Mono<Void>
, потому что null не допускается в реактивных цепочках). Вы всегда должны возвращаться, чтобы каждый шаг мог передавать свой сигнал COMPLETE.
Также я удалил StepVerifier, который вы используете в своем коде, потому что он обычно используется для проверки шагов в модульных тестах и не используется в производственном коде. Подробнее об этом можно прочитать здесь, StepVerifier.
Если вы хотите изучить реактивное программирование, что я предлагаю вам сделать, потому что оно потрясающее и мне оно нравится, я настоятельно рекомендую вам прочитать отличную документацию по реактору Введение в реактивное программирование, где они объяснят концепции ничего не происходит, пока вы не подпишетесь и т. д.
person
Toerktumlare
schedule
30.11.2020
return Mono.from( .... )
, вы нарушаете реактивную цепочку, не возвращаясь, поэтому клиент не может подписаться. - person Toerktumlare   schedule 28.11.2020