Spring Integration - ограничения емкости очереди производителя
Мы используем удаленное разделение с MessageChannelPartitionHandler для отправки сообщений о разделе в очередь (ActiveMQ) для выбора и обработки рабочих. Задание имеет огромные данные для обработки, многие сообщения раздела публикуются в очереди, а агрегатор ответов от replyChannnel дает сбой из-за тайм-аута сообщений, поскольку все сообщения не могут быть обработаны в заданное время. Мы также попытались ограничить сообщения, публикуемые в очереди, с помощью емкости очереди ‹integration:queue capacity=200/›, что привело к сбою сервера с созданием дампа кучи из-за проблем с памятью при хранении всех сообщений этих разделов во внутренней памяти.
Мы хотели контролировать создание самого разделения StepExecution, чтобы не возникало проблем с памятью. Примерный случай: около 4k сообщений раздела публикуются в очереди, и вся работа занимает около 3 часов.
Можем ли мы контролировать публикацию сообщений в QueueChannel?
<bean id="senExtractMemberMasterPartitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="messagingOperations" ref="senExtractMemberMasterPartitionMsgTemplate" />
<property name="replyChannel" ref="senExtractProcessingMasterAggregatedChannel" />
<property name="stepName" value="senExtractGeneratePrintRequestWorkerStep" />
<property name="gridSize" value="500" />
</bean>
<bean id="senExtractMemberMasterPartitionMsgTemplate" class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="senExtractProcessingMasterRequestChannel" />
<property name="receiveTimeout" value="18000000" />
</bean>
<integration:channel id="senExtractProcessingMasterAggregatedChannel" >
<integration:queue />
<integration:interceptors>
<integration:wire-tap channel="masterLoggingChannel" />
</integration:interceptors>
</integration:channel>
<int-jms:outbound-gateway
id="senExtractMasterOutGateway"
connection-factory="masterJMSConnectionFactory"
correlation-key="JMSCorrelationID"
request-channel="senExtractProcessingMasterRequestChannel"
request-destination-name="senExtractRequestQueue"
reply-channel="senExtractProcessingMasterReplyChannel"
reply-destination-name="senExtractReplyQueue"
async="true"
auto-startup="true"
reply-timeout="18000000"
receive-timeout="6000">
<integration:poller ref="masterPoller"/>
<int-jms:reply-listener />
</int-jms:outbound-gateway>