Функция Azure: как лучше реализовать задержку повторной попытки сообщения очереди

Моя функция Azure должна прослушивать очередь сообщений, затем получать сообщение, пытаться вызвать внешнюю службу со значением внутри сообщения, если внешняя служба возвращает «ОК», тогда мы должны записать сообщение в другую очередь (для следующей функции Azure), если возвращается «Ошибка», мы должны вернуться в нашу текущую очередь и повторить попытку с помощью нашей функции Azure снова через 5 минут. Как это реализовать? Я сделал это с помощью Timer, но решение мне не нравится:

    [FunctionName("FunctionOffice365VerificateDomain_and_AddService_and_GexMxRecord")]
    public async static Task Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer,
        [Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue,
        [Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]CloudQueue outputQueue,
        ILogger log)
    {
        while (true)
        {
            // do "invisible" message for next 30 sec
            var message = await listenQueue.GetMessageAsync();
            if (message != null)
            {
                DomainForRegistration domainForRegistration = JsonConvert.DeserializeObject<DomainForRegistration>(message.AsString);
                try
                {
                    await _office365DomainService.VerifyDomainAsync(domainForRegistration.DomainName);
                    // remove message
                    await listenQueue.DeleteMessageAsync(message);

                    await _office365DomainService.UpdateIndicateSupportedServicesDomainAsync(domainForRegistration.DomainName);

                    var mxRecord = await _office365DomainService.GetMxRecordForDomainAsync(domainForRegistration.DomainName);
                }
                catch (DomainVerificationRecordNotFoundException)
                {
                     // thrown when VerifyDomainAsync failed
                }
            }
            else
                break;
        }
    }

Как это сделать более аккуратно, без этих while(true), но с таймаутом после неудачной проверки?


person Oleg Sh    schedule 21.01.2019    source источник
comment
Если это очередь, почему функция не запускается для каждого отдельного сообщения? Не уверен, зачем вам нужен цикл.   -  person DavidG    schedule 21.01.2019
comment
@DavidG, потому что после сбоя мне нужно добавить то же сообщение, и функция будет вызываться немедленно (без задержки) снова   -  person Oleg Sh    schedule 21.01.2019


Ответы (1)


Согласитесь с @DavidG, попробуйте использовать триггер очереди для достижения своей цели. W может полагаться на хост установка Очереди.

visibilityTimeout - временной интервал между попытками обработки сообщения при сбое. maxDequeueCount - количество попыток обработки сообщения перед перемещением его в опасную очередь.

{
    "version": "2.0",
    "extensions": {
        "queues": {
            "visibilityTimeout" : "00:05:00",
            "maxDequeueCount": 2,
        }
    }
}

Таким образом, функция должна выглядеть так:

public static async Task Run(
    [QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
    [Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue
)
{
    // do stuff then output message
    await outputQueue.AddAsync(myQueueItem);
}

Если вы не хотите создавать исключение для хоста, мы можем обратиться к initialVisibilityDelay метода CloudQueue.

указание интервала времени с этого момента, в течение которого сообщение будет невидимым

    public static async Task Run(
        [QueueTrigger("domain-verificate-Office365-add-services-get-mx-record")]string myQueueItem, ILogger log,
        [Queue("domain-add-mx-record-to-registrator", Connection = "StorageConnectionString")]IAsyncCollector<string> outputQueue,
        [Queue("domain-verificate-Office365-add-services-get-mx-record", Connection = "StorageConnectionString")]CloudQueue listenQueue
    )
    {

        try 
        {
            // do stuff then output message
            await outputQueue.AddAsync(myQueueItem);
        }
        catch(DomainVerificationRecordNotFoundException)
        {
            // add the message in current queue and can only be visible after 5 minutes
            await listenQueue.AddMessageAsync(new CloudQueueMessage(myQueueItem), null, TimeSpan.FromMinutes(5), null, null);
        }
    }
person Jerry Liu    schedule 21.01.2019
comment
Спасибо! Я не знал о параметре задержки для добавления сообщения в очередь :) - person Oleg Sh; 21.01.2019