Spring Integration - JdbcPollingChannelAdapter фиксирует вместо отката обработанных исключений

Я использую Spring 4.1.x APIs, Spring Integration 4.1.x APIs и Spring Integration Java DSL 1.0.x APIs для потока EIP, где мы получаем сообщения из таблицы базы данных Oracle, используя JdbcPollingChannelAdpater в качестве точки входа в поток.

Несмотря на то, что у нас есть ErrorHandler, настроенный на JdbcPollingChannelAdapter в Poller, мы видим, что сеанс Transaction все еще откатывается и не фиксируется, когда RuntimeException генерируется и правильно обрабатывается ErrorHandler.

После прочтения этого потока: Spring Transactions - Предотвращение отката после непроверенных исключений (RuntimeException ), у меня такое ощущение, что невозможно предотвратить откат и вместо этого принудительно выполнить фиксацию. Это правильно? И, если есть способ, какой самый чистый способ принудительно выполнить фиксацию вместо отката, когда ошибка безопасно обрабатывается?

Текущая конфигурация:

ИнтеграцияConfig.java:

@Bean
public MessageSource<Object> jdbcMessageSource() {

    JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
            dataSource,
            "select * from SERVICE_TABLE where rownum <= 10 for update skip locked");
    adapter.setUpdateSql("delete from SERVICE_TABLE where SERVICE_MESSAGE_ID in (:id)");
    adapter.setRowMapper(serviceMessageRowMapper);
    adapter.setMaxRowsPerPoll(1);
    adapter.setUpdatePerRow(true);
    return adapter;
}

@SuppressWarnings("unchecked")
@Bean
public IntegrationFlow inFlow() {

    return IntegrationFlows
            .from(jdbcMessageSource(),
                    c -> {
                        c.poller(Pollers.fixedRate(100)
                                .maxMessagesPerPoll(10)
                                .transactional(transactionManager)
                                .errorHandler(errorHandler));
                    })
                .channel(inProcessCh()).get();
}

ErrorHandler.java

@Component
public class ErrorHandler implements org.springframework.util.ErrorHandler {

    @Autowired
    private PlatformTransactionManager transactionManager;

    private static final Logger logger = LogManager.getLogger();

    @Override
    public void handleError(Throwable t) {

        logger.trace("handling error:{}", t.getMessage(), t);

        // handle error code here...

        // we want to force commit the transaction here?
        TransactionStatus txStatus = transactionManager.getTransaction(null);
        transactionManager.commit(txStatus);
    }
}

--- ИЗМЕНЕНО для включения компонента ExpressionEvaluatingRequestHandlerAdvice ---

@Bean
public Advice expressionEvaluatingRequestHandlerAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
    expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
    expressionEvaluatingRequestHandlerAdvice.setOnSuccessExpression("payload");
    expressionEvaluatingRequestHandlerAdvice
            .setOnFailureExpression("payload");
    expressionEvaluatingRequestHandlerAdvice.setFailureChannel(errorCh());
    return expressionEvaluatingRequestHandlerAdvice;
}

--- ОТРЕДАКТИРОВАНО, чтобы показать обработчик фиктивного тестового сообщения ---

    .handle(Message.class,
                    (m, h) -> {

                        boolean forceTestError = m.getHeaders().get("forceTestError");
                        if (forceTestError) {
                            logger.trace("simulated forced TestException");
                            TestException testException = new TestException(
                                    "forced test exception");
                            throw testException;
                        }

                        logger.trace("simulated successful process");

                        return m;
                    }, e-> e.advice(expressionEvaluatingRequestHandlerAdvice())

--- ИЗМЕНЕНО, чтобы показать метод ExecutorChannelInterceptor ---

@Bean
public IntegrationFlow inFlow() {

    return IntegrationFlows
            .from(jdbcMessageSource(), c -> {
                c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(10)
                        .transactional(transactionManager));
            })
            .enrichHeaders(h -> h.header("errorChannel", errorCh(), true))
            .channel(
                    MessageChannels.executor("testSyncTaskExecutor",
                            syncTaskExecutor()).interceptor(
                            testExecutorChannelInterceptor()))
            .handle(Message.class, (m, h) -> {
                    boolean forceTestError = m.getHeaders().get("forceTestError");
                    if (forceTestError) {
                        logger.trace("simulated forced TestException");
                        TestException testException = new TestException(
                                "forced test exception");
                        throw testException;
                    }

                    logger.trace("simulated successful process");
            }).channel("nullChannel").get();
}

person Going Bananas    schedule 11.08.2015    source источник


Ответы (1)


Не получится только потому, что ваш ErrorHandler работает уже после окончания ТХ.

Вот пара строк исходного кода (AbstractPollingEndpoint.Poller):

@Override
public void run() {
    taskExecutor.execute(new Runnable() {
        @Override
        public void run() {

           .............
                try {
                        if (!pollingTask.call()) {
                            break;
                        }
                        count++;
                    }
                    catch (Exception e) {
            ....
                    }
                }
            }
        });
    }

Где:

  1. ErrorHandler применяется для taskExecutor (SyncTaskExecutor) по умолчанию.

  2. TransactionInterceptor в качестве Aspect применяется для прокси вокруг этого pollingTask.

Поэтому TX делается вокруг pollingTask.call() и выходит. И только после этого ваш ErrorHandler начинает работать внутри taskExecutor.execute().

Чтобы решить вашу проблему, вам нужно выяснить, какая часть нисходящего потока не так критична для отката TX, и сделать там несколько try...catch или использовать ExpressionEvaluatingRequestHandlerAdvice, чтобы «свернуть» эту RuntimeException.

Но, как вы заметили по моим рассуждениям, это должно быть сделано внутри Техаса.

person Artem Bilan    schedule 11.08.2015
comment
Мы обрабатываем ошибки, используя try...catch, но то, что мы делаем тогда (для всех наших потоков), заключаем ошибку в специфичный для бизнеса RuntimeException, который возвращается и перенаправляется в специфичный для бизнеса errorChannel. Я создал ExpressionEvaluatingRequestHandlerAdvice (см. bean в основном вопросе). Чтобы проверить, я создал фиктивный поток с простой службой, вокруг которой обернут указанный совет, но теперь ни успешный процесс, ни тест процесса с ошибкой не показывают, что происходит фиксация tx. Есть ли пример spring-integration-java-dsl, показывающий, что этот совет используется где-то? - person Going Bananas; 12.08.2015
comment
На самом деле, похоже, что Advice настроен правильно, и транзакции фиксируются во время тестов успешного и ошибочного процессов. (Извините, среда Eclipse IDE не показывала, что тест приостановлен на закулисной точке останова, поэтому фиксация не завершена) - person Going Bananas; 12.08.2015
comment
Однако еще один вопрос: в моем тесте я обернул определенный обработчик сообщений (службу) с помощью Advice. Есть ли способ установить этот совет глобально, чтобы он перехватывал все обработчики сообщений в потоках контекста Spring или, по крайней мере, во всех обработчиках сообщений в данном потоке синхронных транзакций? - person Going Bananas; 12.08.2015
comment
Да, это возможно. В стандартной среде Spring AOP имена компонентов для MessageHandlers или просто MessageHandler.handleMessage() шаблона точечного вырезания. - person Artem Bilan; 13.08.2015
comment
Первый раз работаю с АОП... поэтому для начала я создал аннотированный класс @Aspect и могу заставить его перехватывать вызовы обработчика сообщений, обрабатывающего компонент @Service, аннотируя метод примерно так: @After("execution(* abc.example.service.TestService.consume(..))") - это перехватывается нормально но я не могу перехватить сам вызов .handleMessage() владельца MessageHandler. Я пробовал: @After("execution(* org.springframework.messaging.MessageHandler.*(..))") но в этом случае перехвата не происходит. Что я делаю не так? - person Going Bananas; 14.08.2015
comment
Да, это правда. Поскольку все компоненты Spring Integration расширяют AbstractMessageHandler, handleMessage() которого имеет значение final. Это мешает проксировать метод. С другой стороны, пожалуйста, взгляните на вариант реализации ExecutorChannelInterceptor. - person Artem Bilan; 14.08.2015
comment
Я отредактировал поток для запуска с ExecutorChannelInterceptor (см. последнее EDIT). Реализация Interceptor не делает ничего, кроме реализации простейших методов интерфейса, и я установил его на SyncTaskExecutor по каналу сообщений. Это правильный способ использования перехватчика в этом случае? Используя метод перехватчика, откаты были заменены фиксациями (и это здорово), но теперь у меня проблемы с тем, чтобы этот метод работал над несколькими параллельными потоками. Разве это невозможно? - person Going Bananas; 18.08.2015
comment
Вы должны использовать там org.springframework.messaging.support.ExecutorSubscribableChannel. ExecutorChannelInterceptor поддерживается Spring Integration, начиная с версии 4.2, которая находится в RC1 с GA незадолго до SpringOne в сентябре этого года. С другой стороны, я ничего не могу сказать о вашей проблеме с параллелизмом, потому что я не вижу исходного кода для вашего testExecutorChannelInterceptor()... - person Artem Bilan; 18.08.2015
comment
О да. Добавление ExecutorChannelInterceptor к ExecutorSubscribableChannel вместо этого имело большое значение. И отсутствие полной поддержки до 4.2, вероятно, объясняет, почему раньше я видел только некоторые вызываемые методы перехватчика, а не такие, как beforeHandle() и afterMessageHandled() (которые я вижу сейчас). Пожалуйста, игнорируйте ранее заданный вопрос о многопоточности (больше не проблема). Завтра я проведу полную партию функциональных тестов на полных потоках, просто чтобы убедиться, что все в порядке, и тогда я приму и закрою поток. Ваше здоровье! - person Going Bananas; 18.08.2015
comment
После дальнейших тестов и отладки стало ясно, что транзакции по-прежнему не фиксируются (вместо отката) при вызове ExecutorChannelInterceptor. Например, я могу принудительно зафиксировать транзакцию в методе afterMessageHandled(), но если я это сделаю, TransactionInterceptor все равно попытается откатить транзакцию, и в этот момент возникнет исключение отката. Следовательно, фиксация происходит, но также происходит попытка отката. Как мы можем подавить попытку отката? - person Going Bananas; 20.08.2015
comment
ХОРОШО. Я вижу, что был не прав. В ExecutorSubscribableChannel есть код для повторного создания исключения после выполнения triggerAfterMessageHandled. Так что никакие манипуляции внутри не помогут. Я вижу только способ try...catch целевого сервиса для предотвращения отката или чего-то еще... В противном случае вам следует дождаться jira.spring.io/browse/INT-3770 или полагаться на ExpressionEvaluatingRequestHandlerAdvice для каждого <service-activator> - person Artem Bilan; 20.08.2015
comment
В этом случае мы воспользуемся вашим советом ExpressionEvaluatingRequestHandlerAdvice для каждого @ServiceActivator потока и будем следить за INT-3770, чтобы стать доступным в 4.3 и посмотреть, каковы тогда возможности. Что-то вроде вариантов setAcknowledgeMode()/setTransacted() JMS DMLC было бы идеальным для нас, если бы что-то подобное вообще было возможно при сценарии потока типа JDBC polling adapter. - person Going Bananas; 21.08.2015