Регулирование асинхронных задач

Я хотел бы запустить кучу асинхронных задач с ограничением количества задач, ожидающих завершения в любой момент времени.

Допустим, у вас есть 1000 URL-адресов, и вы хотите, чтобы одновременно было открыто только 50 запросов; но как только один запрос завершается, вы открываете соединение со следующим URL-адресом в списке. Таким образом, одновременно всегда открыто ровно 50 подключений, пока список URL-адресов не будет исчерпан.

Я также хочу использовать заданное количество потоков, если это возможно.

Я придумал метод расширения ThrottleTasksAsync, который делает то, что я хочу. Есть ли уже более простое решение? Я предполагаю, что это обычный сценарий.

Использование:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

Вот код:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

Метод использует BlockingCollection и SemaphoreSlim, чтобы заставить его работать. Дроссель запускается в одном потоке, а все асинхронные задачи выполняются в другом потоке. Чтобы добиться параллелизма, я добавил параметр maxDegreeOfParallelism, который передается в цикл Parallel.ForEach, преобразованный в цикл while.

Старая версия была:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

Но пул потоков быстро исчерпывается, и вы не можете сделать _9 _ / _ 10_.

Бонус: чтобы обойти проблему в BlockingCollection, где исключение выдается в Take() при вызове CompleteAdding(), я использую перегрузку TryTake с таймаутом. Если бы я не использовал тайм-аут в TryTake, это не помогло бы использовать BlockingCollection, поскольку TryTake не будет блокироваться. Есть ли способ лучше? В идеале был бы метод TakeAsync.


person Josh Wyant    schedule 18.03.2014    source источник
comment
Есть ли способ лучше? да, TPL Dataflow.   -  person Scott Chamberlain    schedule 19.03.2014
comment
В примере с URL-адресом вы можете поместить все URL-адреса в ConcurrentBag, запустить 50 потоков и в каждом потоке получить URL-адрес и выполнять запрос, пока пакет не станет пустым.   -  person Bogdan    schedule 19.03.2014
comment
В общем случае используйте ConcurrentBag делегатов :)   -  person Bogdan    schedule 19.03.2014
comment
@Bogdan Я буду выполнять тысячи запросов и хочу запускать их все в одном потоке, используя await. Parallel.ForEach обеспечивает эффект 2 или 4 параллельных while петель.   -  person Josh Wyant    schedule 19.03.2014
comment
@ Скотт Чемберлен Какое конкретное использование TPL Dataflow могло бы улучшить мою ситуацию?   -  person Josh Wyant    schedule 19.03.2014
comment
Я задал аналогичный вопрос (stackoverflow .com / questions / 21169923 /). Dataflow и Rx кажутся наиболее интересными кандидатами. Тем временем я протестировал Dataflow, и он отлично работает.   -  person usr    schedule 19.03.2014


Ответы (3)


Как было предложено, используйте поток данных TPL.

TransformBlock<TInput, TOutput> может быть тем, что ты ищешь.

Вы определяете MaxDegreeOfParallelism, чтобы ограничить количество строк, которые могут быть преобразованы (т. Е. Сколько URL-адресов могут быть загружены) параллельно. Затем вы отправляете URL-адреса в блок, и когда вы закончите, вы сообщаете блоку, что добавили элементы, и получаете ответы.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

Примечание: TransformBlock буферизует как ввод, так и вывод. Тогда зачем нам связывать его с BufferBlock?

Потому что TransformBlock не завершится, пока не будут израсходованы все элементы (HttpResponse), и await downloader.Completion зависнет. Вместо этого мы позволяем downloader пересылать весь свой вывод в выделенный буферный блок - затем мы ждем завершения downloader и проверяем этот буферный блок.

person dcastro    schedule 18.03.2014
comment
+1 Какое изящное решение. Меньше кода, больше функциональности. - person BlueM; 19.03.2014
comment
Надеюсь, мой код хотя бы проиллюстрировал эту мысль :) В моем случае параллелизм, вероятно, был бы низким, но многие асинхронные задачи были бы опубликованы. Как с помощью этой модели регулировать асинхронные задачи? - person Josh Wyant; 19.03.2014
comment
@JoshWyant Вы имеете в виду ограничение на количество одновременных загрузок URL? Использование MaxDegreeOfParallelism - person dcastro; 19.03.2014
comment
скажем, у вас 2 ядра. Вместо создания 50 потоков установите для MaxDegreeOfParallelism значение 2. На каждом ядре вы хотите открыть 25 асинхронных HTTP-запросов. Когда один запрос завершается, на одном из ядер остается 49 ожидающих запросов. await SendAsync() позволит вам опубликовать еще один Task<HttpResponse>, в результате чего количество ожидающих запросов снова увеличится до 50? - person Josh Wyant; 19.03.2014
comment
@JoshWyant С помощью приведенного выше кода вы просто публикуете столько URL-адресов, сколько хотите (используя SendAsync). Они будут буферизированы блоком. Блок будет брать URL-адреса из буфера и обрабатывать не более 50 за раз. Затем результаты будут помещены в другой буфер. TransformBlock буферизует как ввод, так и вывод. - person dcastro; 19.03.2014
comment
Я говорю, что хочу, чтобы моя степень параллелизма (количество одновременных операций) отличалось от количества задач, ожидающих завершения. Я могу установить степень параллелизма на 1 и по-прежнему создавать 1000 HTTP-запросов, используя async / await. В моем примере кода первым аргументом будет 1000, а вторым - 1. - person Josh Wyant; 19.03.2014
comment
@JoshWyant, кстати, вы не можете назначить поток ядру в C # (ну, по крайней мере, не простым способом). Кстати, это называется привязкой к потоку. Как правило, потоков требуется не больше, чем количество ядер. Таким образом, с 4 ядрами у вас будет не более 4 потоков. Но поскольку это не похоже на асинхронный ввод-вывод, вместо работы, связанной с процессором, я думаю, что можно использовать больше. Попробуйте, посмотрите, что работает лучше всего. - person dcastro; 19.03.2014
comment
@JoshWyant Я не уверен, что следил за вашим последним комментарием ... В вашем вопросе говорится, что вы хотите ограничить количество одновременных загрузок .. Есть ли у вас другие требования? - person dcastro; 19.03.2014
comment
@dcastro Вот что происходит. Мы обрабатываем миллионы объектов. Каждая единица работы - это несколько сотен объектов. Для каждого объекта есть вызов GET API, POST и вызов db, и каждый из них будет блокироваться. При Parallel.ForEach max degree = 15 (наиболее эффективный, использует 15 потоков) мы получаем пропускную способность 13,5 объектов в секунду. У меня такое ощущение, что, выполняя запросы с использованием Async / Await, мы можем достичь гораздо большей пропускной способности, поскольку ввод-вывод не будет блокироваться, и мы будем ограничивать количество потоков. Я просто хочу ограничить его требования к памяти и минимизировать таймауты. - person Josh Wyant; 19.03.2014
comment
@JoshWyant Если бы я правильно следил, я бы сделал следующее: у вас есть TransformBlock, который выполняет запросы GET и POST последовательно с лимитом, скажем, 15. Таким образом, у вас будет 15 запросов, выполняемых одновременно, самое большее (например, , 10 GET + 5 POST). Затем я бы связал это с другим ActionBlock, который возьмет предыдущий вывод и сохранит его в базе данных. Вам это кажется правильным? Кроме того, убедитесь, что вы используете правильные асинхронные API для достижения истинного асинхронного ввода-вывода. - person dcastro; 19.03.2014
comment
Также ознакомьтесь с Введение в поток данных TPL, отличный материал. - person dcastro; 19.03.2014
comment
@dcastro Итак, я остановился на решении Dataflow. Изначально я опасался, что MaxDegreeOfParallelism работает точно так же, как Parallel.ForEach, просто создавая произвольное количество потоков для достижения параллелизма. Я был неправ, и параметр очень хорошо работает с async. Tpl.Dataflow прекрасно работает. Спасибо! - person Josh Wyant; 20.03.2014
comment
@dcastro ссылка на Introduction to TPL Dataflow мертва :( - person MPavlak; 08.09.2016
comment
@MPavlak теперь резервное копирование. - person avs099; 10.07.2017
comment
Хотя я поддержал этот ответ, связывание TransformBlock с BufferBlock с последующим вызовом BufferBlock.TryReceiveAll - не лучший способ получить ответы, хранящиеся в TransformBlock. Такой подход неэффективен и чувствителен к условиям гонки. Лучше всего получать ответы напрямую, используя асинхронный цикл, как показано здесь. По сути, строку await downloader.Completion; и все, что следует за ней, можно заменить только на это: var responses = await downloader.ToListAsync(); - person Theodor Zoulias; 01.07.2021

Допустим, у вас есть 1000 URL-адресов, и вы хотите, чтобы одновременно было открыто только 50 запросов; но как только один запрос завершается, вы открываете соединение со следующим URL-адресом в списке. Таким образом, одновременно всегда открыто ровно 50 подключений, пока список URL-адресов не будет исчерпан.

Следующее простое решение неоднократно появлялось здесь, на SO. Он не использует код блокировки и не создает потоки явно, поэтому очень хорошо масштабируется:

const int MAX_DOWNLOADS = 50;

static async Task DownloadAsync(string[] urls)
{
    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async url => 
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                Console.WriteLine(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks);
    }
}

Дело в том, что обработка загруженных данных должна выполняться в другом конвейере с другим уровнем параллелизма, особенно если это Обработка с привязкой к ЦП.

Например, вы, вероятно, захотите иметь 4 потока, одновременно выполняющих обработку данных (количество ядер ЦП), и до 50 ожидающих запросов для дополнительных данных (которые вообще не используют потоки). AFAICT, это не то, что сейчас делает ваш код.

Вот где TPL Dataflow или Rx могут пригодиться в качестве предпочтительного решения. Тем не менее, безусловно, можно реализовать что-то подобное с простым TPL. Обратите внимание, единственный блокирующий код здесь - это тот, который выполняет фактическую обработку данных внутри Task.Run:

const int MAX_DOWNLOADS = 50;
const int MAX_PROCESSORS = 4;

// process data
class Processing
{
    SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS);
    HashSet<Task> _pending = new HashSet<Task>();
    object _lock = new Object();

    async Task ProcessAsync(string data)
    {
        await _semaphore.WaitAsync();
        try
        {
            await Task.Run(() =>
            {
                // simuate work
                Thread.Sleep(1000);
                Console.WriteLine(data);
            });
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async void QueueItemAsync(string data)
    {
        var task = ProcessAsync(data);
        lock (_lock)
            _pending.Add(task);
        try
        {
            await task;
        }
        catch
        {
            if (!task.IsCanceled && !task.IsFaulted)
                throw; // not the task's exception, rethrow
            // don't remove faulted/cancelled tasks from the list
            return;
        }
        // remove successfully completed tasks from the list 
        lock (_lock)
            _pending.Remove(task);
    }

    public async Task WaitForCompleteAsync()
    {
        Task[] tasks;
        lock (_lock)
            tasks = _pending.ToArray();
        await Task.WhenAll(tasks);
    }
}

// download data
static async Task DownloadAsync(string[] urls)
{
    var processing = new Processing();

    using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS))
    using (var httpClient = new HttpClient())
    {
        var tasks = urls.Select(async (url) =>
        {
            await semaphore.WaitAsync();
            try
            {
                var data = await httpClient.GetStringAsync(url);
                // put the result on the processing pipeline
                processing.QueueItemAsync(data);
            }
            finally
            {
                semaphore.Release();
            }
        });

        await Task.WhenAll(tasks.ToArray());
        await processing.WaitForCompleteAsync();
    }
}
person noseratio    schedule 19.03.2014
comment
Это самый простой и ясный ответ. Это в значительной степени именно то, что я пытался сделать. Моя ошибка заключалась в попытке запустить семафор в отдельном потоке, но это делает его намного проще и устраняет BlockingCollection. Я просто не понимал, что могу использовать WaitAsync таким образом. Спасибо @Noseratio. - person Josh Wyant; 19.03.2014
comment
@JoshWyant, без проблем. Я считаю, что это в значительной степени то, что TPL Dataflow делал бы за кулисами, если бы его конвейер был правильно спроектирован и собран. Просто мне не хватает навыков TPL Dataflow, но я собираюсь потратить на это больше времени. - person noseratio; 19.03.2014
comment
Ты прав. Как только вы это поймете, TPL Dataflow работает прекрасно. Он решает проблему распределения async работы по нескольким ядрам, что было моей другой целью. Этот ответ касается моей первой цели, и Dataflow решает их обе. @Noseratio - person Josh Wyant; 20.03.2014
comment
Будьте осторожны, если вы проверяете это с новым HttpClient () по умолчанию в ядре .net при попадании в ту же конечную точку. По умолчанию он ограничивает количество подключений на сервер (видел это в скрипте, где он ограничивал его до 2), если вы не укажете новый HttpClient (новый HttpClientHandler {MaxConnectionsPerServer = ...}). Все в этом ответе работает так, как рекламируется, но вы все равно можете быть ограничены этим параметром. - person Tom; 03.10.2018
comment
Я не понимаю ... Вы рекомендуете или не поощряете использование простого семафорного решения? И если да, можно ли использовать Slim версию? В документации указано, что он подходит только для очень короткого ожидания. - person SerG; 24.10.2019
comment
@SerG, этот пост должен был показать, как это можно сделать на низком уровне. Я бы по-прежнему рекомендовал использовать TPL Dataflow, если это позволяют ограничения времени / зависимости вашего проекта. Какая часть документов говорит, что SemaphoreSlim подходит только для кратковременного ожидания? - person noseratio; 25.10.2019
comment
comment
В документации говорится: Класс SemaphoreSlim представляет собой легкий и быстрый семафор, который можно использовать для ожидания в рамках одного процесса, когда ожидается, что время ожидания будет очень коротким. Я бы сказал, это означает SemaphoreSlim лучше оптимизирован для коротких периодов ожидания, чем исходный Semaphore (который всегда является оболочкой для объекта семафора Win32). Это не означает, что SemaphoreSlim подходит только для короткого ожидания и его следует избегать при длительном ожидании, ИМО. - person noseratio; 27.10.2019

Как я и просил, вот код, который я выбрал.

Работа настроена в конфигурации «мастер-деталь», и каждый мастер обрабатывается как пакет. Каждая единица работы выстраивается в очередь следующим образом:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

Мастера помещаются в буфер по одному, чтобы сохранить работу для других внешних процессов. Детали для каждого мастера отправляются на работу через masterTransform TransformManyBlock. BatchedJoinBlock также создается для сбора деталей в одной партии.

Фактическая работа выполняется в detailTransform TransformBlock асинхронно, по 150 за раз. BoundedCapacity установлен на 300, чтобы гарантировать, что слишком много мастеров не будут помещены в буфер в начале цепочки, а также оставляют место для постановки в очередь достаточного количества подробных записей, позволяющих обрабатывать 150 записей за один раз. Блок выводит object для своих целей, потому что он фильтруется по ссылкам в зависимости от того, Detail или Exception.

batchAction ActionBlock собирает выходные данные из всех пакетов и выполняет массовые обновления базы данных, ведение журнала ошибок и т. Д. Для каждого пакета.

Будет несколько BatchedJoinBlock, по одному на каждого мастера. Поскольку каждый ISourceBlock выводится последовательно и каждый пакет принимает только то количество подробных записей, которое связано с одним мастером, пакеты будут обрабатываться по порядку. Каждый блок выводит только одну группу и по завершении отключается. Только последний пакетный блок передает свое завершение на последний ActionBlock.

Сеть потока данных:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
person Community    schedule 28.03.2014