Как отключить обработку очереди хранилища веб-заданий Azure для определенных элементов очереди?

У меня есть функция веб-задания Azure, запускаемая из очереди:

public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
    {
       // Do normal processing...
    }

Я хочу выборочно игнорировать определенные элементы очереди в зависимости от условия. Например, если у меня закончился виджет X, и я не хочу продолжать обрабатывать поставки виджетов, пока у меня не будет запаса. Поэтому я хочу временно игнорировать / пропускать все элементы очереди Widget X. Так что-то вроде:

public void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
    if (!HaveStock(WidgetX))
       // Ignore queue item - treat it as if it was not in the queue at all

    // Do normal processing...
}

Создание исключения для сбоя обработки не кажется мне вариантом, потому что это отравит сообщение.

Я мог бы установить для VisibilityTimeout какое-то произвольно высокое значение, а затем выдать исключение, но тогда, когда я хотел бы выполнить обработку (а не просто ждать тайм-аута), мне пришлось бы иметь отдельное сканирование процесса для элементов очереди с более высоким VisibilityTimeout и установите для тайм-аута меньшее значение (даже не уверен, возможно ли это). Но создание процесса для сканирования этих элементов очереди - это то, чего я пытаюсь избежать в первую очередь, поскольку я хотел бы сохранить логику в функции, запускаемой очередью, если это возможно.

Какие у меня есть варианты?

Обновление: я понимаю, что то, что я пытаюсь выполнить, несовместимо с целью функций, запускаемых очередью, в первую очередь, учитывая, что причина событий, запускаемых очередью, заключается в том, чтобы что-то делать при появлении сообщения, и что, если я хочу выборочно обработка Я мог бы сам реализовать процесс опроса очереди и обработки сообщений. Но я пытаюсь воспользоваться преимуществами более высокого уровня абстракции, обеспечиваемого триггерной обработкой.


person Howiecamp    schedule 07.10.2016    source источник


Ответы (1)


Я хотел бы сказать; Пусть Queue будет Queue, а FIFO будет FIFO.

Если вы хотите обрабатывать условную обработку сообщений с помощью очереди, я предлагаю два варианта.

1. Используйте темы служебной шины Azure

Для выборочного фильтра FIFO подходящим вариантом может быть замена очереди Azure на тему служебной шины Azure.

См. Руководство, чтобы получить подсказку: https://azure.microsoft.com/en-us/documentation/articles/websites-dotnet-webjobs-sdk-service-bus/#topics

2. Использовать дополнительную очередь

Как и в случае с отравленной очередью, отправьте проигнорированный объект во вторичную очередь и обработайте их позже с помощью другой логики. Я рекомендую это, так как мы легко понимаем потоки.

CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
_secondaryQueue= queueClient.GetQueueReference(storageQueueName);
_secondaryQueue.CreateIfNotExists();

public static void ProcessQueueMessage([QueueTrigger("%MyQueue%")] Item item, TextWriter logger)
{
    // send to secondary queue.
    await _secondaryQueue.AddMessageAsync(new CloudQueueMessage(JsonConvert.SerializeObject(item)));
}
person Youngjae    schedule 07.10.2016
comment
Вы помогли мне понять, что я действительно нарушаю семантику функции, запускаемой из очереди. Если бы я начинал все сначала, я бы принял ваше предложение №1 (или даже если бы я все еще использовал очереди, а не служебную шину, я бы все равно провел свой собственный опрос). Мне нравится твоя идея №2. Но если я использую привязку вывода собственной очереди, это означает, что каждый элемент входной очереди будет сохранен в очереди вывода (должен быть установлен out parm ). Поэтому вместо использования привязки очереди вывода я просто сохраняю сообщения, соответствующие моим критериям, во вторую очередь. - person Howiecamp; 08.10.2016
comment
@Howiecamp // Вы правы. Я отредактировал второй ответ. - person Youngjae; 08.10.2016