Как распространить состояние JTA при использовании реактивного обмена сообщениями?

Я хотел бы распространить состояние JTA (= транзакцию) между транзакционной конечной точкой REST, которая отправляет сообщение на коннектор реактивного обмена сообщениями.

@Inject
@Channel("test")
Emitter<String> emitter;

@POST
@Transactional
public Response test() {
    emitter.send("test");
}

а также

@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {

    @Inject
    TransactionManager tm;

    @Override
    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        return ReactiveStreams.<Message<?>>builder()
            .flatMapCompletionStage(message -> {
                tm.getTransaction(); // = null
                return message.ack();
            })
            .ignore();
    }
}

Насколько я понимаю, распространение контекста отвечает за доступность транзакции (см. io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext). Проблема, похоже, в том, что currentContext создается при подписке, что происходит, когда точка инъекции (Emitter<String> emitter) получает свой экземпляр. Что еще слишком рано, чтобы правильно фиксировать транзакцию.

Что мне не хватает?

Кстати, у меня такая же проблема при использовании @Incoming / @Outgoing вместо эмиттера. Я решил привести вам этот пример, потому что его легко понять и воспроизвести.


person stphngrtz    schedule 18.02.2021    source источник
comment
У вас есть расширение для распространения контекста? Если да, создайте репродуктор и откройте проблему GitHub, чтобы мы могли посмотреть.   -  person Guillaume Smet    schedule 18.02.2021
comment
Да, распространение контекста находится в библиотеках. Проблема GitHub с репродуктором: github.com/smallrye/smallrye-reactive-messaging/issues / 1016   -  person stphngrtz    schedule 19.02.2021


Ответы (1)


На данный момент вам необходимо передать текущую транзакцию в метаданных сообщения. Таким образом, он будет распространен на ваши различные нижестоящие компоненты (а также на коннектор).

Обратите внимание, что транзакция имеет тенденцию быть привязанной к области запроса, а это означает, что в вашем соединителе уже может быть слишком поздно ее использовать. Итак, убедитесь, что ваша конечная точка является асинхронной и возвращается только после подтверждения отправленного сообщения.

Распространение контекста в этом случае не поможет, поскольку базовые потоки создаются во время запуска (во время сборки в Quarkus), поэтому контексты захвата отсутствуют.

person Clement    schedule 24.02.2021