Spring интеграция с Redis Poller с транзакцией

Я использую spring integeration redis, сообщение опроса от redis, например:

@Bean
public PseudoTransactionManager transactionManager() {
    final PseudoTransactionManager pseudoTransactionManager = new PseudoTransactionManager();
    return pseudoTransactionManager;
}

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
    transactionSynchronizationProcessor.setAfterCommitExpression(this.PARSER.parseExpression("#store.rename('commit')"));
    transactionSynchronizationProcessor.setAfterRollbackExpression(this.PARSER.parseExpression("#store.rename('roll')"));
    DefaultTransactionSynchronizationFactory transactionSynchronizationFactory = new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor);
    return transactionSynchronizationFactory;
}


@Bean
public SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapter(RedisStoreMessageSource redisStoreMessageSource, TransactionSynchronizationFactory transactionSynchronizationFactory) {

    SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapterFactoryBean = new SourcePollingChannelAdapterFactoryBean();
    sourcePollingChannelAdapterFactoryBean.setAutoStartup(true);
    sourcePollingChannelAdapterFactoryBean.setOutputChannelName("mail-delivery-status-route-channel");
    sourcePollingChannelAdapterFactoryBean.setSource(redisStoreMessageSource);
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(10);
    pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
    PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
    pollerMetadata.setTrigger(periodicTrigger);
    sourcePollingChannelAdapterFactoryBean.setPollerMetadata(pollerMetadata);


    return sourcePollingChannelAdapterFactoryBean;
}


@Bean
public TestHandler testHandler() {

    return new TestHandler();
}

@Bean
public IntegrationFlow trans() {
    return flow -> flow.channel("mail-delivery-status-route-channel").handle(testHandler());
}

Обычно после завершения процесса будет выполнена операция afterCommit #store.rename('commit'), но она не выполняет ее сейчас и продолжит опрос, я отлаживаю, обнаруживаю, что: AbstractPollingEndpoint#bindResourceHolderIfNecessary TransactionSynchronizationManager.isActualTransactionActive() всегда ложно. Как мне улучшить программу.


person steven    schedule 12.07.2018    source источник


Ответы (1)


pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory); недостаточно. Вам не удалось добавить adviceChain в PollerMetadata, где одним из них должно быть TransactionInterceptor. См. TransactionInterceptorBuilder для удобства.

Хотя совершенно неясно, зачем использовать SourcePollingChannelAdapterFactoryBean вручную, если в проекте уже есть Java DSL и IntegrationFlow может обработать весь шаблонный код за вас. Я имею в виду, что вам нужно изучить:

/**
 * Populate the provided {@link MessageSource} object to the {@link IntegrationFlowBuilder} chain.
 * The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageSource}.
 * In addition use {@link SourcePollingChannelAdapterSpec} to provide options for the underlying
 * {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter} endpoint.
 * @param messageSource the {@link MessageSource} to populate.
 * @param endpointConfigurer the {@link Consumer} to provide more options for the
 * {@link org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean}.
 * @return new {@link IntegrationFlowBuilder}.
 * @see MessageSource
 * @see SourcePollingChannelAdapterSpec
 */
public static IntegrationFlowBuilder from(MessageSource<?> messageSource,
        Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {

И настройте там .transactional() и transactionSynchronizationFactory() на PollerSpec.

person Artem Bilan    schedule 12.07.2018
comment
спасибо , после добавления adviceChain , код в порядке , и может ссылаться на 'https://stackoverflow.com/questions/19423160/sourcepollingchanneladapter-with-transaction ' - person steven; 13.07.2018