Очистка очереди служебной шины Azure за один раз

В нашем проекте мы используем очередь служебной шины. Нам нужна функция для удаления всех сообщений из очереди, когда администратор решит очистить очередь. Я искал в сети, но не смог найти ни одной функции, которая делает это внутри класса QueueClient.

Должен ли я выталкивать все сообщения одно за другим, а затем помечать их как завершенные, чтобы очистить очередь, или есть лучший способ?

QueueClient queueClient = _messagingFactory.CreateQueueClient(
                              queueName, ReceiveMode.PeekLock);

BrokeredMessage brokeredMessage = queueClient.Receive();

while (brokeredMessage != null )
{
    brokeredMessage.Complete();
    brokeredMessage = queueClient.Receive();
}

person bhavesh lad    schedule 29.03.2012    source источник
comment
Недавно я столкнулся с той же проблемой, и удаление и повторное создание не было вариантом, зацикливание работает, но медленно, однако, используя PrefetchCount член QueueClient, вы можете легко собрать несколько тысяч сообщений за одно обращение туда и обратно, ускоряя процесс. чрезвычайно: msdn.microsoft.com/en- мы/библиотека/   -  person Necrolis    schedule 21.12.2013


Ответы (8)


Использование метода Receive() в цикле while, как и у вас, приведет к тому, что ваш код будет выполняться бесконечно, как только очередь станет пустой, поскольку метод Receive() будет ожидать появления в очереди другого сообщения.

Если вы хотите, чтобы это выполнялось автоматически, попробуйте использовать метод Peek().

Например:

while (queueClient.Peek() != null)
{
    var brokeredMessage = queueClient.Receive();
    brokeredMessage.Complete();
}

Вы можете снова сделать это проще с помощью ReceiveMode.ReceiveAndDelete, как было упомянуто hocho.

person Scott Brady    schedule 11.02.2014
comment
Несколько проблем: (1) Peek будет запланирован, будущие сообщения, которые Receive не будут, что приведет к зависанию вашего приложения в ожидании запланированных сообщений. (2) Каждый Peek переходит к следующему сообщению независимо от его статуса, поэтому, если вы Откажетесь от первого сообщения вместо Complete, 2-й Peek получит 2-е сообщение, а 2-й Receive повторно обработает 1-е. Это приводит к смещению Peek от Receive на одно сообщение и оставляет одно сообщение необработанным в конце вашей очереди для каждого брошенного сообщения. - person Justin J Stark; 18.10.2016
comment
+1 @JustinJStark При тестировании этого кода (как есть, без обработки, просто получение и завершение) в очереди, которая не получала новых сообщений, всегда не удавалось полностью очистить очередь. Было очищено только 50-75% сообщений. Я бы не стал использовать этот подход для очистки очереди. - person Justin; 05.08.2017
comment
Это не QueueClient из Microsoft.Azure.ServiceBus, не так ли? Мне трудно найти Microsoft.ServiceBus.Messaging, в котором есть QueueClient с методом CreateReceiver... - person pbordeaux; 19.06.2019

С использованием :

  • Оба подхода (от @ScottBrady и @participant)
  • И абстракция MessageReceiver

вы можете написать метод, который очищает очередь служебной шины или тему/подписку:

MessageReceiver messageReceiver = ...
while (messageReceiver.Peek() != null)
{
    // Batch the receive operation
    var brokeredMessages = messageReceiver.ReceiveBatch(300);

    // Complete the messages
    var completeTasks = brokeredMessages.Select(m => Task.Run(() => m.Complete())).ToArray();

    // Wait for the tasks to complete. 
    Task.WaitAll(completeTasks);
}
person Thomas    schedule 22.02.2016

Я получаю хорошие результаты, используя комбинацию ReceiveAndDelete, PrefetchCount, ReceiveBatchAsync и простой цикл проверки правды вместо использования Peek. Пример с MessagingFactory ниже:

var receiverFactory = MessagingFactory.CreateFromConnectionString("ConnString");
var receiver = receiverFactory.CreateMessageReceiver("QName", ReceiveMode.ReceiveAndDelete);
receiver.PrefetchCount = 300;

bool loop = true;
while (loop)
{
    var messages = await receiver.ReceiveBatchAsync(300, TimeSpan.FromSeconds(1));
    loop = messages.Any();
}

Требуется только пакет Nuget WindowsAzure.ServiceBus.

person ElliotSchmelliot    schedule 08.02.2018

Для Azure-ServiceBus-Queues существует ReceiveBatch-method, который позволяет чтобы получить пакет из n-сообщений одновременно. В сочетании с ReceiveMode.ReceiveAndDelete вы можете более эффективно очищать очередь.

Предостережение Может быть возвращено количество n сообщений, но это не гарантируется. Также существует ограничение на размер пакета сообщений 256K. .

person participant    schedule 13.01.2015

Самый быстрый способ очистить очередь Azure ServiceBus — установить очень короткую DefaultMessageTimeToLive, подождать несколько секунд, попробовать получить из очереди принудительное обновление, а затем восстановить исходную DefaultMessageTimeToLive.

Вы можете сделать это с портала или из кода:

var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
var queueDescription = _namespaceManager.GetQueue(queueName);
var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.ReceiveAndDelete);

var dl = queueDescription.EnableDeadLetteringOnMessageExpiration;
var ttl = queueDescription.DefaultMessageTimeToLive;

queueDescription.EnableDeadLetteringOnMessageExpiration = false;
queueDescription.DefaultMessageTimeToLive = TimeSpan.FromSeconds(1);

Thread.Sleep(5000);
var dumy = queueClient.ReceiveBatch(200, TimeSpan.FromSeconds(1)).ToArray();

queueDescription.EnableDeadLetteringOnMessageExpiration = dl;
queueDescription.DefaultMessageTimeToLive = ttl;
person Florent Quienne    schedule 07.07.2016
comment
Привет, Флоран. Не уверен, изменился ли API, но я только что попробовал это в своей очереди, и изменений не было. Одни и те же сообщения в очереди до и после. - person David Gerding; 17.08.2016

Существует простой метод Clear() для очистки всей очереди, если вы используете библиотеку WindowsAzure.Storage из nuget. Я использую класс Microsoft.Windows.Azure.Queue из этой библиотеки для управления очередью. В противном случае вы можете получить доступ через их API в соответствии с их документация. Я не знаю, как долго этот метод находится в библиотеке Azure, и, вероятно, его не существовало, когда вопрос был изначально задан, но REST API восходит как минимум к 2014 году согласно этот отзыв об Azure

Полный код .NET для очистки очереди с помощью библиотеки Azure:

string connectionString = "YourAzureConnectionStringHere";
string queueName = "YourWebJobQueueName";
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);

// Create the queue client, then get a reference to queue
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
queue = queueClient.GetQueueReference(GetQueueName(queueName));

// Clear the entire queue
queue.Clear();
person jozolo    schedule 11.01.2018
comment
Первоначальный вопрос касался очистки очереди служебной шины Azure, а не обычной очереди Azure. - person OlavT; 05.04.2018
comment
По данным Microsoft, и Azure Service Bus Queue, и Azure Queue поддерживают функцию .clear: feedback.azure.com/forums/217298-storage/suggestions/ - person jozolo; 10.05.2018
comment
Не в версии клиента .NET Core (т. е. Microsoft.Azure.ServiceBus). Это не содержит метод Clear(). - person Marcel Toth; 12.09.2018

Вы можете просто сделать это из лазурного портала.

Если вас не беспокоит сообщение в очереди, хранящееся за последние 14 дней (по умолчанию), и вы хотите немедленно очистить очередь, тогда -

измените время сообщения на живое значение на 5 или 10 секунд и подождите 5-10 секунд. все сообщения очереди будут удалены. вы можете снова сбросить исходное значение. по умолчанию 14 дней.

введите здесь описание изображения

person SSD    schedule 17.03.2021

Первое, о чем я подумал, когда прочитал вопрос, — почему бы не удалить и не создать очередь заново? Это может быть подход. Но если вы не хотите этого делать, вы можете получать и удалять каждое сообщение, пока не останется ни одного сообщения.

Вы можете использовать новейшую библиотеку .NET служебной шины Azure Azure.Messaging.ServiceBus для обоих подходов.

  1. Удалить и заново создать очередь с помощью AdministrationClient
using Azure.Messaging.ServiceBus.Administration;

ServiceBusAdministrationClient adminClient = new ServiceBusAdministrationClient("<connectionstring>");
await adminClient.DeleteQueueAsync("myqueue");          
await adminClient.CreateQueueAsync("myqueue");          

Примечание. ServiceBusAdministrationClient предназначен для операций CRUD в уже существующем пространстве имен служебной шины. Если вам также нужна возможность создать пространство имен, используйте Microsoft.Azure.Management.ServiceBus.

  1. Получать и удалять, пока не останется сообщения
using Azure.Messaging.ServiceBus;

await using var client = new ServiceBusClient("<connectionstring>");
                
ServiceBusReceiver receiver = client.CreateReceiver("myqueue", 
    new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });

while ((await receiver.PeekMessageAsync()) != null)
{
    // receive in batches of 100 messages.
    await receiver.ReceiveMessagesAsync(100);
}

ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete говорит само за себя. Режим приема по умолчанию — PeekLock, который на самом деле не удаляет сообщения из очереди — он сообщает служебной шине, что принимающий клиент хочет разрешить полученные сообщения явно. Часть блокировки означает, что конкурирующие получатели не могут получить доступ к сообщению на время блокировки.

person lily_m    schedule 07.05.2021