Избегайте дублирования сообщений в JMS/ActiveMQ

Есть ли способ подавить повторяющиеся сообщения в очереди, определенной на сервере ActiveMQ?

Я попытался вручную определить JMSMessageID (message.setJMSMessageID("uniqueid")), но сервер игнорирует эту модификацию и доставляет сообщение со встроенным сгенерированным JMSMessageID.

По спецификации я не нашел ссылку о том, как дедуплицировать сообщения.

В HornetQ, чтобы решить эту проблему, нам нужно объявить специфичное свойство HQ org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID в определении сообщения.

i.e.:

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

Кто-нибудь знает, есть ли подобное решение для ActiveMQ?


person Andre Pastore    schedule 08.02.2011    source источник


Ответы (5)


Вам следует взглянуть на Apache Camel, он предоставляет потребительский компонент Idempotent, который будет работать с любым провайдером JMS, см.: http://camel.apache.org/idempotent-consumer.html

Использование этого в сочетании с компонентом ActiveMQ значительно упрощает использование JMS, см.: http://camel.apache.org/activemq.html

person Tim Bish    schedule 09.02.2011
comment
Я сомневаюсь, что этот метод решит мою проблему. Мне нужно сохранить только один экземпляр сообщения с одним и тем же JMSMessageID только во время того, как этот экземпляр находится в очереди. Мне нужно, чтобы он работал как набор. Я хочу иметь возможность поместить другое сообщение с тем же JMSMessageID после того, как последний элемент idem был удален из очереди. Мне нужно это реализовать и протестировать. Но, основываясь на Idempotent, описанном в книге EAI, я думаю, что эта концепция не соответствует моей необходимости. НО, предложенное решение хорошее. Я изучу больше об этом и прокомментирую здесь свои результаты. Спасибо - person Andre Pastore; 11.02.2011

Я сомневаюсь, что ActiveMQ поддерживает его изначально, но реализовать идемпотентного потребителя должно быть легко. Способ сделать это состоит в том, чтобы добавить уникальный идентификатор к каждому сообщению на стороне производителя, теперь на стороне потребителя с использованием хранилища (базы данных, кэша и т. д.), можно выполнить проверку, чтобы увидеть, было ли сообщение получено ранее и продолжить обработку на основе этой проверки.

Я вижу предыдущий вопрос о переполнении стека в том же духе: дубликаты сообщений">Apache ActiveMQ 5.3 - Как настроить очередь для отклонения повторяющихся сообщений? , это также может помочь.

person Biju Kunjummen    schedule 09.02.2011
comment
Поскольку сам потребитель может быть многопоточным, чтобы определить, является ли его дубликат или нет, необходимо реализовать распределенную блокировку/блокировку в памяти. Верно? - person user1401472; 17.11.2019

Теперь поддерживается удаление повторяющихся сообщений, запеченных в транспортах ActiveMQ. См. значения конфигурации auditDepth и auditMaximumProducerNumber в Руководстве по настройке подключения.

person Chris Pitman    schedule 08.08.2013
comment
как вы на самом деле настраиваете эти параметры, чтобы избежать дублирования? - person Thomas; 26.08.2013
comment
@ Томас, я не уверен, о чем ты спрашиваешь. Как вообще применить настройку в ActiveMQ? Или какие значения использовать для этих конкретных полей? - person Chris Pitman; 27.08.2013
comment
Просто из описания параметров мне это звучит не так ясно. Например, auditDepth означает ли значение Nb сообщений или nb байтов, которые будут отфильтрованы на предмет дублирования? Что касается auditMaximumProducerNumber, значит ли это, что будет отобрано ограниченное количество продюсеров? Кстати, если сообщение с одинаковым содержанием публикуют 2 разных подписчика, сообщение случайно не считается дублированным? - person Thomas; 27.08.2013
comment
@Chris IIUC, эти параметры гарантируют обнаружение дубликатов для 2048 сообщений в каждом до 64 производителей. Но как ActiveMQ определяет, что является дубликатом? Если это JMSMessageID, то мы возвращаемся к исходной точке, потому что не можем установить это. - person Antares42; 22.01.2015

Существует способ заставить ActiveMQ фильтровать дубликаты на основе свойства JMS. это включает в себя написание Activemq Plugin. Базовый фильтр брокера, который отправляет повторяющиеся сообщения в очередь недоставленных сообщений, будет выглядеть так:

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;

public class DuplicateFilterBroker extends BrokerFilter {
    String messagePropertyName;
    boolean switchValue;

    public DuplicateFilterBroker(Broker next, String messagePropertyName) {
        super(next);
        this.messagePropertyName = messagePropertyName;
    }

    public boolean hasDuplicate(String propertyValue){
        switchValue = propertyValue;
        return switchValue;
    }

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
        ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
        Object msgObj = msg.getMessage(); 
        if (msgObj instanceof javax.jms.Message) { 
            javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
            if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) {
                super.send(producerExchange, msg);
            }
            else {
               sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
            } 
        }
    }  
}
person Monachus    schedule 27.08.2012
comment
Как этот плагин решает, какое свойство будет использоваться для дальнейшей фильтрации повторяющихся сообщений, объяснение варианта использования было бы очень полезно для интеграции такого плагина. Заранее спасибо за ваш ответ. - person Hayra; 27.02.2021

Кажется, способ, предложенный в вопросе, работает и для ActiveMQ (2016/12). См. руководство по activemq-artemis. Это требует, чтобы производитель установил определенное свойство в сообщении.

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id";   // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

Однако класс, содержащий это свойство, отличается: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID, а значение свойства равно _AMQ_DUPL_ID.

person Gab    schedule 13.12.2016