Как массово обрабатывать сообщения из очереди служебной шины каждые X секунд

Моя очередь служебной шины ранее обрабатывала одно сообщение за раз. Я хотел изменить его, чтобы сообщения помещались в очередь, чтобы обрабатывать их массово каждые X секунд. Сначала я сделал следующее:

var messagingFactorySettings = new MessagingFactorySettings
{
    NetMessagingTransportSettings = { 
        BatchFlushInterval = TimeSpan.FromMilliseconds(10000) },
        TokenProvider = credentials
};

Теперь я хочу получать сообщения, но, похоже, мне нужно выполнить этот бесконечный цикл while, чтобы добраться до них:

while ((messages = myQueueClient.ReceiveBatch(1000).ToList()) != null)
{
    foreach (var message in messages)
    { ...

Насколько я понимаю, BatchFlushInterval позволит запросам накапливаться в течение времени X, так что я получаю сообщения не одно за другим, а целиком. Поэтому я не уверен, почему я не могу сделать то, что делал раньше:

myQueueClient.OnMessage((m) =>
{

Но массовая версия:

myQueueClient.OnBulkMessage((listOfMessages) =>
{

Я что-то упускаю или постоянно ищу единственный способ добиться этого? Также мой BatchFlushInterval, похоже, игнорируется. Я ожидал, что он будет проверять наличие новых сообщений только каждые 10 секунд, но он получает первый пакет немедленно, и любые новые сообщения, которые приходят, также немедленно обрабатываются.

Предполагая, что я хочу каждые X (например, 1) секунд извлекать из очереди до Y сообщений (например, 1000) и обрабатывать их сразу, как бы мне это сделать? Почему BatchFlushInterval не есть какое-то влияние?


person KingOfHypocrites    schedule 23.05.2015    source источник


Ответы (1)


Казалось бы, простой Thread.Sleep(x) в порядке.

Мне показалась интересной проблема приостановки до тех пор, пока не истечет какое-то общее время, поэтому я пошел и реализовал подкласс секундомера, который делает эту проблему решаемой более чистым и удобочитаемым способом.

while ((var messages = myQueueClient.ReceiveBatch(1000)) != null)
{
    var sw = WaitableStopwatch.StartNew();

    // ReceiveBatch() return IEnumerable<>. No need for .ToList().
    foreach (var message in messages)
    {
        ...
    }

    // If processing took less than 10 seconds, sleep
    // for the remainder of that time span before getting
    // the next batch.
    sw.Wait(Timespan.FromSeconds(10));
}



/// <summary>
/// Extends Stopwatch with the ability to wait until a specified
/// elapsed time has been reached.
/// </summary>
public class WaitableStopwatch : Stopwatch
{
    /// <summary>
    /// Initializes a new WaitableStopwatch instance, sets the elapsed
    /// time property to zero, and starts measuring elapsed time.
    /// </summary>
    /// <returns>A WaitableStopwatch that has just begun measuring elapsed time.</returns>
    public static new WaitableStopwatch StartNew()
    {
        WaitableStopwatch sw = new WaitableStopwatch();

        sw.Start();

        return sw;
    }

    /// <summary>
    /// Waits until the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public void Wait(int elapsedMilliseconds)
    {
        Wait(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    }

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public void Wait(TimeSpan elapsed)
    {
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        {
            Thread.Sleep(diff);
        }
    }

    /// <summary>
    /// Waits until when the ElapsedMilliseconds property reaches <paramref name="elapsedMilliseconds"/>.
    /// </summary>
    /// <param name="elapsedMilliseconds"></param>
    public Task WaitAsync(int elapsedMilliseconds)
    {
        return WaitAsync(TimeSpan.FromMilliseconds(elapsedMilliseconds));
    }

    /// <summary>
    /// Waits until when the Elapsed property reaches <paramref name="elapsed"/>.
    /// </summary>
    /// <param name="elapsed"></param>
    public async Task WaitAsync(TimeSpan elapsed)
    {
        TimeSpan diff;

        while ((diff = elapsed - this.Elapsed) > TimeSpan.Zero)
        {
            await Task.Delay(diff);
        }
    }
}
person glenebob    schedule 23.05.2015
comment
Цикл вводится только каждые 30 секунд, и он обрабатывает захват нового сообщения автоматически, но это не происходит со скоростью, которую я установил как BatchFlushInterval. Даже если бы я собирался избавиться от цикла while и написать свой собственный таймер, чтобы просто вызывать ReceiveBatch, я бы подумал, что в этом нет необходимости. С этим кодом таймер будет поверх 30-секундной задержки. - person KingOfHypocrites; 23.05.2015
comment
В вопросе не совсем ясно. Вы говорите, что ReceiveBatch () возвращается за 30 секунд, но возвращает только одно сообщение, хотя их много? - person glenebob; 23.05.2015
comment
Я обновил свой вопрос. На самом деле он возвращается автоматически, если там есть новые данные. Я устанавливаю таймер, чтобы продолжать добавлять записи, и они продолжают обрабатываться немедленно, а не с учетом настройки в BatchFlushInterval. - person KingOfHypocrites; 23.05.2015
comment
В этом случае может показаться, что мой ответ должен помочь, очевидно, после изменения на десять секунд. Я не знаю о настройке BatchFlushInterval. - person glenebob; 23.05.2015
comment
Возможно, вы правы, говоря о том, что вам нужно использовать таймер. Я нашел ссылку в Интернете, где они делают нечто подобное. Я думаю, что BatchFlushInterval относится только к методам отправки и завершения, но я не уверен, как это сделать. Если я не могу найти встроенное решение, то отмечу как ответ. - person KingOfHypocrites; 24.05.2015
comment
У меня есть шанс плюс это сегодня. Работает действительно хорошо. Хорошая вещь. Спасибо, что прошли лишнюю милю. Я могу сказать, что вы любите программировать. :) - person KingOfHypocrites; 24.05.2015