Обработка тайм-аута подтверждения издателя в Spring AMQP-RabbitMQ

Я тестирую реализацию Spring RabbitMQ для AMQP и хочу использовать подтверждения издателя. Чего мне не хватает как в документации, так и в коде, так это того, как я должен обрабатывать неподтвержденные сообщения определенного возраста.

Чистая клиентская java-библиотека RabbitMQ предоставляет Channel.waitForConfirmsOrDie (timeout), который работает нормально, но тогда это заставит меня углубиться в абстракцию Spring, а также почему бы мне не продолжать публиковать и повторять неподтвержденные сообщения ? (и, кстати, было бы здорово, если бы для этого можно было использовать spring-retry, в настоящее время я должен его реализовать).

Я нашел RabbitTemplate.getUnconfirmed (long), но проблема, с которой я сталкиваюсь, заключается в том, что он, похоже, не является потокобезопасным, поскольку, когда мой издатель отправляет сообщения непрерывно, и я пытаюсь повторно отправить неподтвержденные сообщения старше 5 секунд, он выдает ошибку:

Exception in thread "publisher-A-500000to999999" java.util.ConcurrentModificationException
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1207)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1243)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1238)
at org.springframework.amqp.rabbit.core.RabbitTemplate.getUnconfirmed(RabbitTemplate.java:503)
at com.mycompany.rabbitmq.tools.failover.Publisher.resendUnconfirmed(Publisher.java:65)
at com.mycompany.rabbitmq.tools.failover.Publisher.run(Publisher.java:52)
at java.lang.Thread.run(Thread.java:745)

Возможно, я делаю что-то совершенно не так, поскольку я использую CorrelationData в качестве держателя сообщения, поэтому его легче отправить повторно.

Я создал класс MessageConfirmData:

private static class MessageCorrelationData extends  CorrelationData {

    private final Message message;
    private final long messageIndex;
    private final int retryCount;

    public MessageCorrelationData(Message message, long messageIndex, int retryCount) {
        super(UUID.randomUUID().toString());
        this.message = message;
        this.messageIndex = messageIndex;
        this.retryCount = retryCount;
    }
}

И это моя логика повторной отправки, которую я вызываю после отправки каждых 100 сообщений:

private int resendUnconfirmed() {
    Collection<CorrelationData> unconfirmed = rabbitTemplate.getUnconfirmed(5000);
    int numUnconfirmed = 0;
    if (unconfirmed != null ) {
        numUnconfirmed = unconfirmed.size();

        for (CorrelationData correlationData : unconfirmed) {
            MessageCorrelationData messageCorrelationData = (MessageCorrelationData) correlationData;
            trySend(exchange, messageCorrelationData.message, messageCorrelationData.retryCount + 1, messageCorrelationData.messageIndex);
        }
    }
    return numUnconfirmed;
}

Мой обратный звонок для подтверждения:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        MessageCorrelationData mcd = (MessageCorrelationData) correlationData;
        if (!ack) {
            LOG.error("NACK, cause: " + cause + " resending, retry: " + mcd.retryCount);
            trySend(exchange,mcd.message, mcd.retryCount + 1, mcd.messageIndex);
        }
    });

И, наконец, отправка:

    rabbitTemplate.convertAndSend(exchange, "", amqpMessage, new MessageCorrelationData(amqpMessage, messageIndex, retryCount));

person Balint Pato    schedule 26.06.2015    source источник


Ответы (1)


Вы нашли ошибку; Я поднял проблему JIRA, если вы хотите следить за ней.

Я не уверен, как здесь поможет Spring Retry; Если у вас есть идеи, не стесняйтесь открывать новую функцию или выпуск JIRA «Улучшение».

ИЗМЕНИТЬ

Отправлен запрос на извлечение.

Скоро он будет в выпуске.

person Gary Russell    schedule 26.06.2015
comment
Спасибо, Гарри! Тем временем в качестве обходного пути я синхронизировал извне вызовы convertAndSend и getUnconfirmed - person Balint Pato; 27.06.2015