Тема ActiveMQ — сообщения не сохраняются в базе данных

Я включил постоянство в брокере ActiveMQ. Он сохраняет сообщения в таблице БД для сообщений в очереди. Проблема ActiveMQ не сохраняет сообщения в базе данных для ТЕМ. Конфигурация прикреплена ниже. Я ищу сообщения в таблице activemq_msgs.

Если я проверю эту таблицу на наличие очереди, она будет заполнена. Но сообщения в темах не сохраняются.

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="Central-Dev-Broker" persistent="true"  offlineDurableSubscriberTimeout="8" offlineDurableSubscriberTaskSchedule="3000" useJmx="true">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>

            <policyEntry queue=">">
              <deadLetterStrategy>
                <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
              </deadLetterStrategy>
            </policyEntry>

          </policyEntries>
        </policyMap>
    </destinationPolicy>
    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext>
    <persistenceAdapter>
        <!--kahaDB directory="${activemq.data}/kahadb"/-->
        <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    </persistenceAdapter>
      <systemUsage>
        <systemUsage>
            <memoryUsage>
                <memoryUsage percentOfJvmHeap="70" />
            </memoryUsage>
            <storeUsage>
                <storeUsage limit="100 gb"/>
            </storeUsage>
            <tempUsage>
                <tempUsage limit="50 gb"/>
            </tempUsage>
        </systemUsage>
    </systemUsage>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>
    <plugins/>
    <shutdownHooks>
        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>
</broker>


 <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/db_activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
    <import resource="jetty.xml"/>
</beans>

person learner    schedule 04.02.2018    source источник


Ответы (1)


сообщения для темы сохраняются, только если есть онлайн-подписчики или автономные постоянные подписчики. если в теме нет подписчиков, сообщения теряются.

кроме того, у вас есть эта конфигурация ниже, которая удаляет неактивных постоянных подписчиков через 3 секунды! поэтому все сообщения, сохраненные для постоянных подписчиков, удаляются. http://activemq.apache.org/manage-durable-subscribers.html

  offlineDurableSubscriberTimeout="8" offlineDurableSubscriberTaskSchedule="3000"
person Hassen Bennour    schedule 05.02.2018
comment
Спасибо. Я отключил offlineDurableSubscriberTimeout, установив значение -1. Но все же сообщения не сохраняются в таблице базы данных activemq_msgs - person learner; 05.02.2018
comment
Есть ли устойчивый подписчик в офлайне?? - person Hassen Bennour; 05.02.2018
comment
Да, есть. Я тестирую сценарий, в котором постоянный подписчик находится как в сети, так и в автономном режиме, а activemq перезапускается. И сообщения теряются. Я проверил это, отправив 10000 сообщений, и прежде чем все они были израсходованы, я остановил и перезапустил activemq. - person learner; 05.02.2018
comment
Итак, протестируйте этот сценарий: создайте долговременного подписчика —> остановите его —> отправьте сообщения в тему —> проверьте сообщения в таблице mysql —> подключите надежного подписчика для использования сообщений —> убедитесь, что таблица mysql пуста. - person Hassen Bennour; 05.02.2018
comment
после этого вы можете выполнить этот сценарий: создать надежного подписчика -> остановить его - > отправить сообщения в тему - > проверить сообщения в таблице mysql - > перезапустить брокера - > подключить надежного подписчика для использования сообщений - > проверить, что таблица mysql пустой - person Hassen Bennour; 05.02.2018
comment
Я также проверил это. Абонент получает сообщения, как только он перезапускается. Но сообщения никогда не приходят в таблицу дБ, что делает проблему - person learner; 05.02.2018
comment
База данных не показывает ни одной строки в таблице MSG, когда подписчик отключается или появляется снова. - person learner; 05.02.2018
comment
jmsTemp.send(dest, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Message message = session.createTextMessage(какое-то сообщение); message.setJMSExpiration((subType.getMsgTimeToLiveMins())*60000); сообщение .setJMSDestination(новый ActiveMQTopic(test.nnm.data.auditer)); message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); возвращаемое сообщение; }}); - person learner; 05.02.2018
comment
производитель кажется в порядке, я никогда не использовал jdbcPersistenceAdapter, попробуйте удалить ниже policyEntry и перезапустить брокера и проверить - person Hassen Bennour; 05.02.2018
comment
Это не сработало :(. Я попытался вернуться к сохранению KahaDB и получил тот же результат, то есть потерю сообщения после перезапуска. - person learner; 06.02.2018
comment
можете ли вы обновить свой вопрос с помощью надежного кода подписчика, снимка экрана ActiveMQ WebConsole, журналов запуска - person Hassen Bennour; 06.02.2018