Использование реактивного (r2dbc) пакета с транзакцией

Привет, у меня очень важный вопрос. Я пытаюсь создать пакет с использованием реактивного r2dbc и использовать транзакцию для аннотации метода. Но похоже, что если я использую как транзакционный, так и пакетный код, код просто зависает и не работает. Ниже приведен код

    @Transactional /* **Transaction** */
    @GetMapping("/batchFetchData")
    public Flux<Object> batchFetch() {
        long startTime = System.currentTimeMillis();
        Mono.from(databaseConfiguration.connectionFactory().create())
                .flatMapMany(connection -> Flux.from(connection
                        .createBatch() /* **Creating batch***/
                        .add("SELECT * FROM xtable where xId = 232323")
                        .add("SELECT * FROM ytable where yId = 454545")
                        .add("SELECT * FROM ztable where zId = 676767")
                        //.execute()));  /* **Execution batch***/
                        .execute())).as(StepVerifier::create)
                .expectNextCount(3) /* **Expect count batch***/
                .verifyComplete();  /* **Verify batch***/

        LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
    return null;
    }

person Monish Das    schedule 27.11.2020    source источник
comment
ничего не происходит, пока вы не подпишетесь. Вы игнорируете возврат от моно. Измените его на return Mono.from( .... ), вы нарушаете реактивную цепочку, не возвращаясь, поэтому клиент не может подписаться.   -  person Toerktumlare    schedule 28.11.2020
comment
Я не уверен, какое это имеет значение. Проблема в том, что когда я использую закомментированный (verifycomplete), транзакция просто зависает. В конце концов, это всего лишь партия, которая, как ожидается, ничего не вернет.   -  person Monish Das    schedule 30.11.2020


Ответы (2)


Вы разрываете реактивную цепочку.

В реактивном программировании ничего не происходит, пока вы не подпишетесь.

Что это значит? Я могу показать это на небольшом примере.

// If running this, nothing happens
Mono.just("Foobar");

в то время как:

Mono.just("Foobar").subscribe(s -> System.out.println(s));

Напечатаем:

Foobar

Это также применимо, если у вас есть функция

public void getString() {
    Mono.just("Foobar");
}

// Nothing happens, you have declared something 
// but it will never get run, no one is subscribing
getString();

Что тебе необходимо сделать:

public Mono<String> getString() {
    // This could be saving to a database or anything, this will now get run
    return Mono.just("Now this code will get run");
}

// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));

Так что же происходит? Что ж, в реактивном программировании, как только кто-то подписывается на Mono или Flux, программа будет проходить вверх и строить цепочку обратных вызовов, пока не найдет производителя, который начинает генерировать значения (в моем случае это справедливый оператор). Этот этап называется этапом сборки. Когда эта фаза будет завершена, реактивная цепочка начнет производить ценности для всех, кто подписывается.

Если никто не подписывается, цепочка не будет построена.

Итак, кто подписчик? Обычно это конечный потребитель стоимости. Так, например, веб-страница, инициировавшая вызов, или мобильное приложение, но также может быть вашей службой Spring Boot, если она инициирует вызов (например, задание cron).

Итак, давайте посмотрим на ваш код:

@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
    long startTime = System.currentTimeMillis();

    // Here you declare a Mono but ignoring the return type so breaking the reactive chain
    Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch() /* **Creating batch***/
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    //.execute()));  /* **Execution batch***/
                    .execute())).as(StepVerifier::create)
            .expectNextCount(3) /* **Expect count batch***/
            .verifyComplete();  /* **Verify batch***/
            // Here at the end you have no subscriber

    LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));

    // Null is not allowed in reactive chains
    return null;
}

Так как же решить эту проблему?

Ну нужно не разрывать реактивную цепочку. Это базовое реактивное программирование.

@Transactional
@GetMapping("/batchFetchData")
public Mono<Void> batchFetch() {
    long startTime = System.currentTimeMillis();

    // we return here so that the calling client 
    // can subscribe and start the chain
    return Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch()
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    .execute()))
                    .then(); 
                    // then() statement throws away whatever the return 
                    // value is and just signals to the calling client 
                    // when everything is done.       
}

Я не хочу ничего возвращать

Вот для чего нужен оператор Mono#then. Вы видите, что когда каждая часть в цепочке завершена, она будет сигнализировать о ее завершении, а затем передавать значения из одной части в другую, а затем снова сигнализировать и передавать значение и т. Д. Когда мы дойдем до оператора then, он просто подаст сигнал COMPLETE и ничего не возвращает (или фактически возвращает Mono<Void>, потому что null не допускается в реактивных цепочках). Вы всегда должны возвращаться, чтобы каждый шаг мог передавать свой сигнал COMPLETE.

Также я удалил StepVerifier, который вы используете в своем коде, потому что он обычно используется для проверки шагов в модульных тестах и ​​не используется в производственном коде. Подробнее об этом можно прочитать здесь, StepVerifier.

Если вы хотите изучить реактивное программирование, что я предлагаю вам сделать, потому что оно потрясающее и мне оно нравится, я настоятельно рекомендую вам прочитать отличную документацию по реактору Введение в реактивное программирование, где они объяснят концепции ничего не происходит, пока вы не подпишетесь и т. д.

person Toerktumlare    schedule 30.11.2020

Ваша проблема:

return null;

Вы должны вернуть Mono / Flux в реактивном приложении, даже если в потоке нет элементов, вместо этого верните Mono.emtpy.

Посмотрите мой пример вставить несколько записей.

А в тестах используйте StepVerify, чтобы проверить результат.

Для поддержки транзакций в приложениях WebFlux вы должны прочитать соответствующую документацию, чтобы проверить, поддерживается ли она как обычная локальная транзакция или ограничения при ее использовании.

Есть два подхода к использованию транзакции, если она хорошо поддерживается.

  1. Вставьте TransactionalOperator (аналогичный традиционному TransactionTemplate), чтобы обернуть вашу бизнес-логику.
  2. Применяйте аннотацию @Transaction к классам или методам как к общим.
person Hantsy    schedule 02.12.2020