Я пытаюсь использовать activeMQ с потребителем NMS (C#), чтобы получать сообщения, выполнять некоторую обработку, а затем отправлять содержимое в веб-сервис через HttpClient.PostAsync(), все работает в службе Windows (через Topshelf).
Нисходящая система, с которой я общаюсь, чрезвычайно обидчива, и я использую индивидуальное подтверждение, чтобы я мог проверить ответ и действовать соответствующим образом, подтверждая или запуская пользовательскую повторную попытку (т.е. не session.recover).
Поскольку нисходящая система ненадежна, я пробовал несколько разных способов уменьшить пропускную способность моего потребителя. Я думал, что смогу добиться этого, преобразовав его в синхронный и используя предварительную выборку, но, похоже, это не сработало.
Насколько я понимаю, с асинхронным потребителем «предел» предварительной выборки никогда не будет достигнут, но при использовании синхронного метода очередь предварительной выборки будет съедена только по мере подтверждения сообщений, а это означает, что я могу настроить свой прослушиватель для передачи сообщений со скоростью, которая ниже по течению Компонент может справиться.
С очередью, загруженной 100 сообщениями, и запуском моего кода с помощью прослушивателя (т.е. асинхронно), я могу успешно зарегистрировать, что 100 сообщений прошли. Когда я меняю его на использование Consumer.Receive() (или ReceiveNoWait), я никогда не получаю сообщения.
Вот фрагмент того, что я пытаюсь сделать для синхронного потребителя, с включенной асинхронной опцией, но закомментированной:
public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
consumer = session.CreateConsumer(queue);
// Asynchronous
//consumer.Listener += new MessageListener(OnMessage);
// Synchronous
var message = consumer.Receive(TimeSpan.FromSeconds(5));
while (true)
{
if (!Equals(message, null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
message.Acknowledge();
}