Опрос очереди JMS из источника Spring XD для регулирования

Я пытаюсь использовать источник XD JMS для чтения данных из очереди activeMq и просто записывать их. Мое требование - читать очередь только с определенным интервалом, пытаясь реализовать регулирование. Мне нужен мой поток для обработки только 1 сообщения в секунду, в очереди могут быть сообщения с любой скоростью, например 20 сообщений в секунду и т. Д.

Стандартный источник JMS реализует message-driven-channel-adapter, который немедленно считывает данные из очереди. Итак, я создал собственный модуль (polledJms) следующим образом:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <import resource="../../../common/jms-${provider}-infrastructure-context.xml"/>
    <int:channel id="output"/>
    <int-jms:inbound-channel-adapter id="jmsPolledSource"
            channel="output"
            destination-name="${destination}"
            connection-factory="connectionFactory">
    <int:poller fixed-rate="5000"/>
    </int-jms:inbound-channel-adapter>

Now when I post something on the queue its not picked up immmediately but after a delay. However that delay isnt consistent. I expect the delay to be always 5sec but sometimes its 10sec, 1minute etc. Not sure what I am doing wrong here.

Мое определение потока выглядит следующим образом:

"polledJms --destination=readQ | log"

Я даже попробовал выражение cron в xml вместо фиксированной скорости, чтобы читать каждые 10 секунд в опросчике и по-прежнему видеть такое же поведение.

Мой собственный модуль - это правильный способ реализовать регулирование очередей JMS, или XD предоставляет готовые возможности для этого, что я упустил. Пожалуйста помоги.


person Ashish C    schedule 19.02.2015    source источник


Ответы (1)


<poller> имеет параметр max-messages-per-poll, который по умолчанию равен Integer.MIN_VALUE, что означает «читать сообщения, пока они не будут в MessageSource».

С другой стороны, fixed-rate означает «начать новую задачу опроса по истечении этого времени сразу после начала предыдущего».

Чтобы сделать его "откладываемым", вам следует подумать об использовании fixed-delay. В этом случае новая задача опроса будет запущена в этот период только после завершения предыдущей задачи.

В остальном ваш настраиваемый модуль выглядит хорошо.

С другой стороны, вы даже можете настроить существующий jms источник с <delayer> перед outputChannel.

ОБНОВЛЕНИЕ

fixed-delay означает: start new polling task over the period after the finish of the previous one. Поскольку ваш опросчик читает сообщение и отправляет его на канал в том же Thread, вы должны добавить для polling task duration время обработки сообщения. Потому что поток опроса занят. Итак, да: ваше сообщение не может опрашиваться каждую секунду.

С другой стороны (используя fixed-rate) вы должны иметь в виду, что Spring Integration по умолчанию использует TaskScheduler с размером пула 10. Вы можете изменить его, используя META-INF/spring.integration.properties со свойством spring.integraton.taskScheduler.poolSize. В вашем случае 100 сообщений в очереди и 10 одновременных потоков для fixed-rate для опроса только одного сообщения, вы можете достичь thread pool starvation и получить неожиданные результаты времени опроса.

person Artem Bilan    schedule 20.02.2015
comment
Спасибо за ваше предложение. Я перешел на <int:poller max-messages-per-poll="1" fixed-delay="1000"> и отправил пакет из 100 сообщений на q. Я ожидаю прочитать 1 сообщение в секунду, но не вижу этого. Я не очень понимаю, как работает опросчик. Насколько я понимаю, если у меня <int:poller fixed-delay="1000" />, он будет получать одно сообщение каждые 1 секунду, но, похоже, этого не происходит. Не могли бы вы указать мне правильное направление. - person Ashish C; 21.02.2015
comment
К моему ответу добавлены дополнительные пояснения - person Artem Bilan; 24.02.2015