Я хотел бы запустить кучу асинхронных задач с ограничением количества задач, ожидающих завершения в любой момент времени.
Допустим, у вас есть 1000 URL-адресов, и вы хотите, чтобы одновременно было открыто только 50 запросов; но как только один запрос завершается, вы открываете соединение со следующим URL-адресом в списке. Таким образом, одновременно всегда открыто ровно 50 подключений, пока список URL-адресов не будет исчерпан.
Я также хочу использовать заданное количество потоков, если это возможно.
Я придумал метод расширения ThrottleTasksAsync
, который делает то, что я хочу. Есть ли уже более простое решение? Я предполагаю, что это обычный сценарий.
Использование:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
Вот код:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
Метод использует BlockingCollection
и SemaphoreSlim
, чтобы заставить его работать. Дроссель запускается в одном потоке, а все асинхронные задачи выполняются в другом потоке. Чтобы добиться параллелизма, я добавил параметр maxDegreeOfParallelism, который передается в цикл Parallel.ForEach
, преобразованный в цикл while
.
Старая версия была:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
Но пул потоков быстро исчерпывается, и вы не можете сделать _9 _ / _ 10_.
Бонус: чтобы обойти проблему в BlockingCollection
, где исключение выдается в Take()
при вызове CompleteAdding()
, я использую перегрузку TryTake
с таймаутом. Если бы я не использовал тайм-аут в TryTake
, это не помогло бы использовать BlockingCollection
, поскольку TryTake
не будет блокироваться. Есть ли способ лучше? В идеале был бы метод TakeAsync
.
await
.Parallel.ForEach
обеспечивает эффект 2 или 4 параллельныхwhile
петель. - person Josh Wyant   schedule 19.03.2014