Мне нужно ввести в рабочий процесс политику повтора. Допустим, есть 3 блока, которые связаны таким образом:
var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 };
var buffer = new BufferBlock<int>();
var processing = new TransformBlock<int, int>(..., executionOptions);
var send = new ActionBlock<int>(...);
buffer.LinkTo(processing);
processing.LinkTo(send);
Таким образом, есть буфер, который накапливает данные, затем отправляет их в блок преобразования, который обрабатывает не более 3 элементов за один раз, а затем результат отправляется в блок действий.
Потенциально во время обработки возможны временные ошибки блока преобразования, и я хочу повторить блок, если ошибка временная несколько раз.
Я знаю, что блоки, как правило, не могут быть повторены (делегаты, которые переданы в блоки, могут быть сделаны повторными). И один из вариантов - обернуть переданный делегат для поддержки повторной попытки.
Я также знаю, что существует очень хорошая библиотека TransientFaultHandling.Core
, которая предоставляет механизмы повтора для временных ошибок. Это отличная библиотека, но не в моем случае. Если я заключу делегата, который передается в блок преобразования, в метод RetryPolicy.ExecuteAsync
, сообщение внутри блока преобразования будет заблокировано, и пока повторная попытка не завершится или не завершится ошибкой, блок преобразования не сможет чтобы получить новое сообщение. Представьте, что если все 3 сообщения введены в повторную попытку (скажем, следующая попытка повторной попытки будет через 2 минуты) и не удастся, блок преобразования будет зависать до тех пор, пока хотя бы одно сообщение не выйдет из блока преобразования.
Единственное решение, которое я вижу, - это расширить TranformBlock
(на самом деле, ITargetBlock
тоже будет достаточно) и повторить попытку вручную (например, из здесь):
do
{
try { return await transform(input); }
catch
{
if( numRetries <= 0 ) throw;
else Task.Delay(timeout).ContinueWith(t => processing.Post(message));
}
} while( numRetries-- > 0 );
напр. чтобы снова поместить сообщение в блок преобразования с задержкой, но в этом случае контекст повторения (количество оставшихся повторных попыток и т. д.) также должен быть передан в этот блок. Звучит слишком сложно ...
Кто-нибудь видит более простой подход к реализации политики повторов для блока рабочего процесса?