ACTIVEMQ + NMS не может синхронно получать

Я пытаюсь использовать activeMQ с потребителем NMS (C#), чтобы получать сообщения, выполнять некоторую обработку, а затем отправлять содержимое в веб-сервис через HttpClient.PostAsync(), все работает в службе Windows (через Topshelf).

Нисходящая система, с которой я общаюсь, чрезвычайно обидчива, и я использую индивидуальное подтверждение, чтобы я мог проверить ответ и действовать соответствующим образом, подтверждая или запуская пользовательскую повторную попытку (т.е. не session.recover).

Поскольку нисходящая система ненадежна, я пробовал несколько разных способов уменьшить пропускную способность моего потребителя. Я думал, что смогу добиться этого, преобразовав его в синхронный и используя предварительную выборку, но, похоже, это не сработало.

Насколько я понимаю, с асинхронным потребителем «предел» предварительной выборки никогда не будет достигнут, но при использовании синхронного метода очередь предварительной выборки будет съедена только по мере подтверждения сообщений, а это означает, что я могу настроить свой прослушиватель для передачи сообщений со скоростью, которая ниже по течению Компонент может справиться.

С очередью, загруженной 100 сообщениями, и запуском моего кода с помощью прослушивателя (т.е. асинхронно), я могу успешно зарегистрировать, что 100 сообщений прошли. Когда я меняю его на использование Consumer.Receive() (или ReceiveNoWait), я никогда не получаю сообщения.

Вот фрагмент того, что я пытаюсь сделать для синхронного потребителя, с включенной асинхронной опцией, но закомментированной:

    public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
    {
        log = logger;
        configuration = config;
        this.endpointClient = endpointClient;

        connection = connectionFactory.CreateConnection();
        connection.RedeliveryPolicy = GetRedeliveryPolicy();
        connection.ExceptionListener += new ExceptionListener(OnException);
        session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
        queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
        consumer = session.CreateConsumer(queue);

        // Asynchronous
        //consumer.Listener += new MessageListener(OnMessage);

        // Synchronous
        var message = consumer.Receive(TimeSpan.FromSeconds(5));
        while (true)
        {
            if (!Equals(message, null))
            {
                OnMessage(message);
            }
        }
    }

    public void OnMessage(IMessage message)
    {
        log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
        message.Acknowledge();
    }

person user2158984    schedule 22.09.2020    source источник
comment
Вы не можете обработать Http, пока не получите закрывающий тег. Если вы используете потоковый режим HTTP 1.0, все идет в одном фрагменте. HTTP 1.1 — это режим фрагментов, в котором сообщение разбивается на фрагменты. Теперь в С# синхронный режим будет блокироваться до тех пор, пока не будут получены все фрагменты. Асинхронный режим даст один ответ, если вы находитесь в 1.0, но даст каждый фрагмент в 1.1.   -  person jdweng    schedule 22.09.2020
comment
@jdweng, как ваш комментарий связан с потребителем NMS, подключающимся к брокеру сообщений ActiveMQ для получения сообщений?   -  person Justin Bertram    schedule 22.09.2020


Ответы (1)


Я считаю, что вам нужно позвонить Start() на свой connection, например:

connection.Start();

Вызов Start() означает, что вы хотите, чтобы сообщения поступали.

Также стоит отметить, что нет другого способа выйти из цикла while(true), кроме создания исключения из OnMessage.

person Justin Bertram    schedule 22.09.2020