Spring Integration - ограничения емкости очереди производителя

Spring Integration - ограничения емкости очереди производителя

Мы используем удаленное разделение с MessageChannelPartitionHandler для отправки сообщений о разделе в очередь (ActiveMQ) для выбора и обработки рабочих. Задание имеет огромные данные для обработки, многие сообщения раздела публикуются в очереди, а агрегатор ответов от replyChannnel дает сбой из-за тайм-аута сообщений, поскольку все сообщения не могут быть обработаны в заданное время. Мы также попытались ограничить сообщения, публикуемые в очереди, с помощью емкости очереди ‹integration:queue capacity=200/›, что привело к сбою сервера с созданием дампа кучи из-за проблем с памятью при хранении всех сообщений этих разделов во внутренней памяти.

Мы хотели контролировать создание самого разделения StepExecution, чтобы не возникало проблем с памятью. Примерный случай: около 4k сообщений раздела публикуются в очереди, и вся работа занимает около 3 часов.

Можем ли мы контролировать публикацию сообщений в QueueChannel?

<bean id="senExtractMemberMasterPartitionHandler"   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
        <property name="messagingOperations" ref="senExtractMemberMasterPartitionMsgTemplate" />
        <property name="replyChannel" ref="senExtractProcessingMasterAggregatedChannel" />
        <property name="stepName" value="senExtractGeneratePrintRequestWorkerStep" />
        <property name="gridSize" value="500" />
</bean>
<bean id="senExtractMemberMasterPartitionMsgTemplate" class="org.springframework.integration.core.MessagingTemplate">
    <property name="defaultChannel" ref="senExtractProcessingMasterRequestChannel" />
    <property name="receiveTimeout" value="18000000" />
</bean>

<integration:channel id="senExtractProcessingMasterAggregatedChannel" >
    <integration:queue />
    <integration:interceptors>
        <integration:wire-tap channel="masterLoggingChannel" />
    </integration:interceptors>
</integration:channel>


<int-jms:outbound-gateway
    id="senExtractMasterOutGateway" 
    connection-factory="masterJMSConnectionFactory"
    correlation-key="JMSCorrelationID"
    request-channel="senExtractProcessingMasterRequestChannel"
    request-destination-name="senExtractRequestQueue" 
    reply-channel="senExtractProcessingMasterReplyChannel"
    reply-destination-name="senExtractReplyQueue" 
    async="true"
    auto-startup="true"
    reply-timeout="18000000" 
    receive-timeout="6000">
    <integration:poller ref="masterPoller"/>
    <int-jms:reply-listener />  
</int-jms:outbound-gateway>

person Krishna Kishore    schedule 08.06.2021    source источник


Ответы (1)


Задание имеет огромные данные для обработки, многие сообщения раздела публикуются в очереди, а агрегатор ответов от replyChannnel дает сбой из-за тайм-аута сообщений, поскольку все сообщения не могут быть обработаны в заданное время.

Вам нужно увеличить время ожидания или добавить больше рабочих. Javadoc для MessageChannelPartitionHandler:

The receive timeout needs to be set realistically in the MessagingTemplate
and the aggregator, so that there is a good chance of all work being done.

Мы хотели контролировать создание самого разделения StepExecution.

Spring Batch предоставляет StepExecutionSplitter для этого. Если вариант по умолчанию (SimpleStepExecutionSplitter) не соответствует вашим потребностям, вы можете предоставить собственную реализацию для своего шага с разделами.

person Mahmoud Ben Hassine    schedule 08.06.2021