Я использую spring integeration redis, сообщение опроса от redis, например:
@Bean
public PseudoTransactionManager transactionManager() {
final PseudoTransactionManager pseudoTransactionManager = new PseudoTransactionManager();
return pseudoTransactionManager;
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
transactionSynchronizationProcessor.setAfterCommitExpression(this.PARSER.parseExpression("#store.rename('commit')"));
transactionSynchronizationProcessor.setAfterRollbackExpression(this.PARSER.parseExpression("#store.rename('roll')"));
DefaultTransactionSynchronizationFactory transactionSynchronizationFactory = new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor);
return transactionSynchronizationFactory;
}
@Bean
public SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapter(RedisStoreMessageSource redisStoreMessageSource, TransactionSynchronizationFactory transactionSynchronizationFactory) {
SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapterFactoryBean = new SourcePollingChannelAdapterFactoryBean();
sourcePollingChannelAdapterFactoryBean.setAutoStartup(true);
sourcePollingChannelAdapterFactoryBean.setOutputChannelName("mail-delivery-status-route-channel");
sourcePollingChannelAdapterFactoryBean.setSource(redisStoreMessageSource);
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(10);
pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
pollerMetadata.setTrigger(periodicTrigger);
sourcePollingChannelAdapterFactoryBean.setPollerMetadata(pollerMetadata);
return sourcePollingChannelAdapterFactoryBean;
}
@Bean
public TestHandler testHandler() {
return new TestHandler();
}
@Bean
public IntegrationFlow trans() {
return flow -> flow.channel("mail-delivery-status-route-channel").handle(testHandler());
}
Обычно после завершения процесса будет выполнена операция afterCommit #store.rename('commit')
, но она не выполняет ее сейчас и продолжит опрос, я отлаживаю, обнаруживаю, что: AbstractPollingEndpoint#bindResourceHolderIfNecessary
TransactionSynchronizationManager.isActualTransactionActive()
всегда ложно. Как мне улучшить программу.