Я хотел бы распространить состояние JTA (= транзакцию) между транзакционной конечной точкой REST, которая отправляет сообщение на коннектор реактивного обмена сообщениями.
@Inject
@Channel("test")
Emitter<String> emitter;
@POST
@Transactional
public Response test() {
emitter.send("test");
}
а также
@ApplicationScoped
@Connector("test")
public class TestConnector implements OutgoingConnectorFactory {
@Inject
TransactionManager tm;
@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
return ReactiveStreams.<Message<?>>builder()
.flatMapCompletionStage(message -> {
tm.getTransaction(); // = null
return message.ack();
})
.ignore();
}
}
Насколько я понимаю, распространение контекста отвечает за доступность транзакции (см. io.smallrye.context.jta.context.propagation.JtaContextProvider#currentContext
). Проблема, похоже, в том, что currentContext
создается при подписке, что происходит, когда точка инъекции (Emitter<String> emitter
) получает свой экземпляр. Что еще слишком рано, чтобы правильно фиксировать транзакцию.
Что мне не хватает?
Кстати, у меня такая же проблема при использовании @Incoming
/ @Outgoing
вместо эмиттера. Я решил привести вам этот пример, потому что его легко понять и воспроизвести.