Я тестирую реализацию Spring RabbitMQ для AMQP и хочу использовать подтверждения издателя. Чего мне не хватает как в документации, так и в коде, так это того, как я должен обрабатывать неподтвержденные сообщения определенного возраста.
Чистая клиентская java-библиотека RabbitMQ предоставляет Channel.waitForConfirmsOrDie (timeout), который работает нормально, но тогда это заставит меня углубиться в абстракцию Spring, а также почему бы мне не продолжать публиковать и повторять неподтвержденные сообщения ? (и, кстати, было бы здорово, если бы для этого можно было использовать spring-retry, в настоящее время я должен его реализовать).
Я нашел RabbitTemplate.getUnconfirmed (long), но проблема, с которой я сталкиваюсь, заключается в том, что он, похоже, не является потокобезопасным, поскольку, когда мой издатель отправляет сообщения непрерывно, и я пытаюсь повторно отправить неподтвержденные сообщения старше 5 секунд, он выдает ошибку:
Exception in thread "publisher-A-500000to999999" java.util.ConcurrentModificationException
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1207)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1243)
at java.util.TreeMap$EntryIterator.next(TreeMap.java:1238)
at org.springframework.amqp.rabbit.core.RabbitTemplate.getUnconfirmed(RabbitTemplate.java:503)
at com.mycompany.rabbitmq.tools.failover.Publisher.resendUnconfirmed(Publisher.java:65)
at com.mycompany.rabbitmq.tools.failover.Publisher.run(Publisher.java:52)
at java.lang.Thread.run(Thread.java:745)
Возможно, я делаю что-то совершенно не так, поскольку я использую CorrelationData в качестве держателя сообщения, поэтому его легче отправить повторно.
Я создал класс MessageConfirmData:
private static class MessageCorrelationData extends CorrelationData {
private final Message message;
private final long messageIndex;
private final int retryCount;
public MessageCorrelationData(Message message, long messageIndex, int retryCount) {
super(UUID.randomUUID().toString());
this.message = message;
this.messageIndex = messageIndex;
this.retryCount = retryCount;
}
}
И это моя логика повторной отправки, которую я вызываю после отправки каждых 100 сообщений:
private int resendUnconfirmed() {
Collection<CorrelationData> unconfirmed = rabbitTemplate.getUnconfirmed(5000);
int numUnconfirmed = 0;
if (unconfirmed != null ) {
numUnconfirmed = unconfirmed.size();
for (CorrelationData correlationData : unconfirmed) {
MessageCorrelationData messageCorrelationData = (MessageCorrelationData) correlationData;
trySend(exchange, messageCorrelationData.message, messageCorrelationData.retryCount + 1, messageCorrelationData.messageIndex);
}
}
return numUnconfirmed;
}
Мой обратный звонок для подтверждения:
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
MessageCorrelationData mcd = (MessageCorrelationData) correlationData;
if (!ack) {
LOG.error("NACK, cause: " + cause + " resending, retry: " + mcd.retryCount);
trySend(exchange,mcd.message, mcd.retryCount + 1, mcd.messageIndex);
}
});
И, наконец, отправка:
rabbitTemplate.convertAndSend(exchange, "", amqpMessage, new MessageCorrelationData(amqpMessage, messageIndex, retryCount));