Политика повторных попыток в ITargetBlock ‹TInput›

Мне нужно ввести в рабочий процесс политику повтора. Допустим, есть 3 блока, которые связаны таким образом:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);

buffer.LinkTo(processing);
processing.LinkTo(send);

Таким образом, есть буфер, который накапливает данные, затем отправляет их в блок преобразования, который обрабатывает не более 3 элементов за один раз, а затем результат отправляется в блок действий.

Потенциально во время обработки возможны временные ошибки блока преобразования, и я хочу повторить блок, если ошибка временная несколько раз.

Я знаю, что блоки, как правило, не могут быть повторены (делегаты, которые переданы в блоки, могут быть сделаны повторными). И один из вариантов - обернуть переданный делегат для поддержки повторной попытки.

Я также знаю, что существует очень хорошая библиотека TransientFaultHandling.Core, которая предоставляет механизмы повтора для временных ошибок. Это отличная библиотека, но не в моем случае. Если я заключу делегата, который передается в блок преобразования, в метод RetryPolicy.ExecuteAsync, сообщение внутри блока преобразования будет заблокировано, и пока повторная попытка не завершится или не завершится ошибкой, блок преобразования не сможет чтобы получить новое сообщение. Представьте, что если все 3 сообщения введены в повторную попытку (скажем, следующая попытка повторной попытки будет через 2 минуты) и не удастся, блок преобразования будет зависать до тех пор, пока хотя бы одно сообщение не выйдет из блока преобразования.

Единственное решение, которое я вижу, - это расширить TranformBlock (на самом деле, ITargetBlock тоже будет достаточно) и повторить попытку вручную (например, из здесь):

do
 {
    try { return await transform(input); }
    catch
    { 
        if( numRetries <= 0 ) throw;
        else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
    }
 } while( numRetries-- > 0 );

напр. чтобы снова поместить сообщение в блок преобразования с задержкой, но в этом случае контекст повторения (количество оставшихся повторных попыток и т. д.) также должен быть передан в этот блок. Звучит слишком сложно ...

Кто-нибудь видит более простой подход к реализации политики повторов для блока рабочего процесса?


person Alex    schedule 04.07.2013    source источник
comment
Об этом (например, здесь):, который я только что значительно улучшил, вы, возможно, захотите посмотреть.   -  person AgentFire    schedule 15.09.2013


Ответы (3)


Я думаю, что вам в значительной степени нужно это сделать, вам нужно отслеживать оставшееся количество повторных попыток для сообщения и вы должны каким-то образом запланировать повторную попытку.

Но вы можете сделать это лучше, инкапсулируя его в отдельный метод. Что-то вроде:

// it's a private class, so public fields are okay
private class RetryingMessage<T>
{
    public T Data;
    public int RetriesRemaining;
    public readonly List<Exception> Exceptions = new List<Exception>();
}

public static IPropagatorBlock<TInput, TOutput>
    CreateRetryingBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform, int numberOfRetries,
    TimeSpan retryDelay, Action<IEnumerable<Exception>> failureHandler)
{
    var source = new TransformBlock<TInput, RetryingMessage<TInput>>(
        input => new RetryingMessage<TInput>
        { Data = input, RetriesRemaining = numberOfRetries });

    // TransformManyBlock, so that we can propagate zero results on failure
    TransformManyBlock<RetryingMessage<TInput>, TOutput> target = null;
    target = new TransformManyBlock<RetryingMessage<TInput>, TOutput>(
        async message =>
        {
            try
            {
                return new[] { await transform(message.Data) };
            }
            catch (Exception ex)
            {
                message.Exceptions.Add(ex);
                if (message.RetriesRemaining == 0)
                {
                    failureHandler(message.Exceptions);
                }
                else
                {
                    message.RetriesRemaining--;
                    Task.Delay(retryDelay)
                        .ContinueWith(_ => target.Post(message));
                }
                return null;
            }
        });

    source.LinkTo(
        target, new DataflowLinkOptions { PropagateCompletion = true });

    return DataflowBlock.Encapsulate(source, target);
}

Я добавил код для отслеживания исключений, потому что считаю, что сбои не следует игнорировать, они должны как минимум регистрироваться.

Кроме того, этот код не очень хорошо работает с завершением: если есть повторные попытки, ожидающие своей задержки, и вы Complete() блок, он будет немедленно завершен, и повторные попытки будут потеряны. Если для вас это проблема, вам придется отслеживать невыполненные повторные попытки и завершать target, когда source завершается и повторных попыток не ожидается.

person svick    schedule 04.07.2013
comment
Спасибо за отличный ответ (особенно замечание о завершении), я бы проголосовал дважды или даже трижды, если бы мог. Я реализовал решение для правильного (я надеюсь) завершения, используя что-то вроде этого: у меня есть набор повторных сообщений (с добавлением и удалением) и продолжение для завершения источника, которое выполняет опрос (например, await Task.Delay(100)) и проверяет есть ли в целевом блоке входные сообщения и пустой ли набор сообщений. Когда оба условия верны, я просто завершаю целевой блок. Можно ли было реализовать это без опроса? - person Alex; 05.07.2013
comment
@Alex Вам не нужен опрос, вам нужно своевременно проверять условия. Но я думаю, вам также нужно знать, что нет сообщений, ожидающих обработки (потому что некоторые из них могут выйти из строя), и я не уверен, как это сделать разумно. - person svick; 05.07.2013

Помимо отличного ответа Свика, есть еще несколько вариантов:

  1. Вы можете использовать TransientFaultHandling.Core - просто установите MaxDegreeOfParallelism в Unbounded, чтобы другие сообщения могли пройти.
  2. Вы можете изменить тип вывода блока, включив в него индикацию сбоя и счетчик повторных попыток, а также создать цикл потока данных, передав фильтр LinkTo, который проверяет, необходима ли еще одна повторная попытка. Этот подход более сложный; вам нужно будет добавить задержку к вашему блоку, если он выполняет повторную попытку, и добавить TransformBlock, чтобы удалить информацию об ошибке / повторной попытке для остальной части меша.
person Stephen Cleary    schedule 05.07.2013
comment
Если я установил MaxDegreeOfParallelism в Unbounded и, например, мой буфер получает 20K входных сообщений, и предположим, что половина из них потребует повторной попытки - я думаю, вся система зависнет. Я прав? - person Alex; 05.07.2013
comment
Еще один комментарий к предыдущему: я думаю, что это не очень хорошая идея сохранять блок, когда он требует просто ожидания и потенциально может обработать другое сообщение - пожалуйста, поправьте меня, если я ошибаюсь. - person Alex; 05.07.2013
comment
@Alex: MaxDegreeOfParallelism это максимум, голод не вызовет. Я не уверен, что вы имеете в виду, говоря о сохранении блока; вы обычно храните сетку потока данных до тех пор, пока данных может быть больше. - person Stephen Cleary; 05.07.2013
comment
@StephenCleary: как я понимаю, при использовании метода retryPolicy.ExecuteAsync из TransientFaultHandling.Core он останавливает выполнение внутри блока и повторяет попытку после интервала повтора. Это означает, что если блок может обрабатывать максимум 3 сообщения одновременно, и одно из них ожидает повторной попытки, блок не сможет получить новое сообщение вместо повторной попытки до тех пор, пока повторная попытка завершается или завершается. - person Alex; 05.07.2013
comment
@Alex: MaxDegreeOfParallelism - это параметр в блоке, поэтому, если вы установите его на 3 и затем получите одно сообщение, ожидающее повторной попытки, блок может развернуть другую задачу для обработки следующего сообщения. P.S. Я предполагаю, что ExecuteAsync будет вести себя разумно (т.е. что он использует Task.Delay, а не Thread.Sleep). Если это глупо (с использованием Thread.Sleep), решение все равно будет работать, но будет очень неэффективным. - person Stephen Cleary; 05.07.2013
comment
Я только что попробовал этот фрагмент кода, где MaxDegreeOfParallelism установлен в 1, и блок ничего не взял, пока повторная попытка сообщения, и весь рабочий процесс зависает. Это то, что я пытался показать чуть раньше (извините, что не слишком ясно). - person Alex; 05.07.2013
comment
@Alex: Конечно, он заблокирует рабочий процесс, если вы установите его на 1 (в любом случае это настройка по умолчанию). Это блокирует, потому что вы сказали делать только по одному. Попробуйте установить Unbounded. - person Stephen Cleary; 05.07.2013
comment
Представьте себе сценарий, когда я устанавливаю его на Unbounded, и буфер получает 20K сообщений, половина сообщений потенциально попадет на повторную попытку, и каждое сообщение обрабатывается с большой нагрузкой на ЦП - весь рабочий процесс зависнет? - person Alex; 05.07.2013
comment
Нет, не застрянет. - person Stephen Cleary; 05.07.2013

Вот два метода CreateRetryTransformBlock и CreateRetryActionBlock, которые работают при этих предположениях:

  1. Вызывающий хочет, чтобы все элементы были обработаны, даже если некоторые из них неоднократно выходили из строя.
  2. Вызывающему интересно знать обо всех произошедших исключениях, даже для элементов, которые в конечном итоге завершились успешно (не применимо для CreateRetryActionBlock).
  3. Вызывающий может захотеть установить верхний предел для общего числа попыток, после которого блок должен перейти в состояние сбоя.
  4. Вызывающий хочет иметь возможность установить все доступные параметры обычного блока, включая MaxDegreeOfParallelism, BoundedCapacity, CancellationToken и EnsureOrdered, поверх параметров, связанных с функцией повтора.

В приведенной ниже реализации для управления уровнем используется SemaphoreSlim параллелизма между операциями, которые выполняются в первый раз, и операциями, в которых произошел сбой, которые повторяются после того, как истечет их продолжительность задержки.

public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    /// <summary>The limit after which an item is returned as failed.</summary>
    public int MaxAttemptsPerItem { get; set; } = 1;
    /// <summary>The delay duration before retrying an item.</summary>
    public TimeSpan RetryDelay { get; set; } = TimeSpan.Zero;
    /// <summary>The limit after which the block transitions to a faulted
    /// state (unlimited is the default).</summary>
    public int MaxRetriesTotal { get; set; } = -1;
}

public readonly struct RetryResult<TInput, TOutput>
{
    public readonly TInput Input { get; }
    public readonly TOutput Output { get; }
    public readonly bool Success { get; }
    public readonly Exception[] Exceptions { get; }

    public bool Failed => !Success;
    public Exception FirstException => Exceptions != null ? Exceptions[0] : null;
    public int Attempts =>
        Exceptions != null ? Exceptions.Length + (Success ? 1 : 0) : 1;

    public RetryResult(TInput input, TOutput output, bool success,
        Exception[] exceptions)
    {
        Input = input;
        Output = output;
        Success = success;
        Exceptions = exceptions;
    }
}

public class RetryLimitException : Exception
{
    public RetryLimitException(string message, Exception innerException)
        : base(message, innerException) { }
}

public static IPropagatorBlock<TInput, RetryResult<TInput, TOutput>>
    CreateRetryTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));
    int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
    int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
    TimeSpan retryDelay = dataflowBlockOptions.RetryDelay;
    if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxAttemptsPerItem));
    if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.MaxRetriesTotal));
    if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
        nameof(dataflowBlockOptions.RetryDelay));
    var cancellationToken = dataflowBlockOptions.CancellationToken;

    var exceptionsCount = 0;
    var semaphore = new SemaphoreSlim(
        dataflowBlockOptions.MaxDegreeOfParallelism);

    async Task<(TOutput, Exception)> ProcessOnceAsync(TInput item)
    {
        await semaphore.WaitAsync(); // Preserve the SynchronizationContext
        try
        {
            var result = await transform(item).ConfigureAwait(false);
            return (result, null);
        }
        catch (Exception ex)
        {
            if (maxRetriesTotal != -1)
            {
                if (Interlocked.Increment(ref exceptionsCount) > maxRetriesTotal)
                {
                    throw new RetryLimitException($"The max retry limit " +
                        $"({maxRetriesTotal}) has been reached.", ex);
                }
            }
            return (default, ex);
        }
        finally
        {
            semaphore.Release();
        }
    }

    async Task<Task<RetryResult<TInput, TOutput>>> ProcessWithRetryAsync(
        TInput item)
    {
        // Creates a two-stages operation. Preserves the context on every await.
        var (result, firstException) = await ProcessOnceAsync(item);
        if (firstException == null) return Task.FromResult(
            new RetryResult<TInput, TOutput>(item, result, true, null));
        return RetryStageAsync();

        async Task<RetryResult<TInput, TOutput>> RetryStageAsync()
        {
            var exceptions = new List<Exception>();
            exceptions.Add(firstException);
            for (int i = 2; i <= maxAttemptsPerItem; i++)
            {
                await Task.Delay(retryDelay, cancellationToken);
                var (result, exception) = await ProcessOnceAsync(item);
                if (exception != null)
                    exceptions.Add(exception);
                else
                    return new RetryResult<TInput, TOutput>(item, result,
                        true, exceptions.ToArray());
            }
            return new RetryResult<TInput, TOutput>(item, default, false,
                exceptions.ToArray());
        };
    }

    // The input block awaits the first stage of each operation
    var input = new TransformBlock<TInput, Task<RetryResult<TInput, TOutput>>>(
        item => ProcessWithRetryAsync(item), dataflowBlockOptions);

    // The output block awaits the second (and final) stage of each operation
    var output = new TransformBlock<Task<RetryResult<TInput, TOutput>>,
        RetryResult<TInput, TOutput>>(t => t, dataflowBlockOptions);

    input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });

    // In case of failure ensure that the input block is faulted too,
    // so that its input/output queues are emptied, and any pending
    // SendAsync operations are aborted
    PropagateFailure(output, input);

    return DataflowBlock.Encapsulate(input, output);

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex) { block2.Fault(ex); }
    }
}

public static ITargetBlock<TInput> CreateRetryActionBlock<TInput>(
    Func<TInput, Task> action,
    RetryExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateRetryTransformBlock<TInput, object>(async input =>
    {
        await action(input).ConfigureAwait(false); return null;
    }, dataflowBlockOptions);
    var nullTarget = DataflowBlock.NullTarget<RetryResult<TInput, object>>();
    block.LinkTo(nullTarget);
    return block;
}
person Theodor Zoulias    schedule 24.05.2020
comment
Для реализации, работающей с элегантным и легким типом Стивена Клири Try вместо несколько громоздкого типа RetryResult, используемого здесь посмотрите на этот вопрос. - person Theodor Zoulias; 02.06.2020
comment
Обновление: меня больше не устраивает приведенная выше реализация из-за неестественного приоритета повторных попыток повторных сбоев. Мое текущее мнение о лучшей реализации RetryTransformBlock - создать конвейер связанных TransformBlock с длиной, равной количеству максимальных повторных попыток, и чтобы все сообщения проходили через конвейер. Все TransformBlocks будут иметь одинаковую логику: обработать сообщение, если оно не обработано, или иным образом распространить его вниз по течению вместе с его результатом без изменений. Такая конфигурация приводит к наиболее естественному поведению IMHO. - person Theodor Zoulias; 23.04.2021