Я написал следующий метод для пакетной обработки огромного CSV-файла. Идея состоит в том, чтобы прочитать часть строк из файла в память, а затем разделить эту часть строк на пакеты фиксированного размера. Как только мы получим разделы, отправьте эти разделы на сервер (синхронный или асинхронный), что может занять некоторое время.
private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);
foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x)
.Select(y => y.Select(z => z)).ToList();
// Process all batches asynchronously
batches.AsParallel().ForAll(async b =>
{
WebClient client = new WebClient();
byte[] bytes = System.Text.Encoding.ASCII
.GetBytes(b.SelectMany(x => x).ToString());
await client.UploadDataTaskAsync("myserver.com", bytes);
});
// clear the chunk
chunk.Clear();
}
chunk.Add(line);
}
}
Этот фрагмент кода не кажется очень эффективным по двум причинам.
Основной поток, читающий CSV-файл, блокируется до тех пор, пока не будут обработаны все разделы.
AsParallel блокируется до завершения всех задач. Поэтому, если в пуле потоков доступно больше потоков для выполнения работы, я не использую их, потому что ни одно задание не связано ни с одним из разделов.
Размер партии фиксирован, поэтому его нельзя изменить, но размер куска можно настроить для повышения производительности. Я могу выбрать достаточно большое значение chunkSize, чтобы ни один из созданных пакетов не был >> ни одним потоком, доступным в системе, но это по-прежнему означает, что метод Parallel.ForEach блокируется до тех пор, пока все задачи не будут выполнены.
Как я могу изменить код так, чтобы все доступные потоки в системе использовались для выполнения работы без простоя. Я думаю, что мог бы использовать BlockingCollection для хранения пакетов, но не уверен, какой размер емкости дать ему, поскольку ни один из пакетов не является динамическим в каждом фрагменте.
Любые идеи о том, как использовать TPL для максимального использования потока, чтобы большинство доступных потоков в системе всегда выполняли какие-то действия?
ОБНОВЛЕНИЕ: вот что я получил, используя поток данных TPL. Это правильно?
private static void UploadData(string filePath, int chunkSize, int batchSize)
{
var buffer1 = new BatchBlock<string>(chunkSize);
var buffer2 = new BufferBlock<IEnumerable<string>>();
var action1 = new ActionBlock<string[]>(t =>
{
Console.WriteLine("Got a chunk of lines " + t.Count());
// Partition each chunk into smaller chunks grouped on column 1
var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches from groups
var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
foreach (var batch in batches)
{
buffer2.Post(batch);
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
buffer1.LinkTo(action1, new DataflowLinkOptions
{ PropagateCompletion = true });
var action2 = new TransformBlock<IEnumerable<string>,
IEnumerable<string>>(async b =>
{
await ExecuteBatch(b);
return b;
}, new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
buffer2.LinkTo(action2, new DataflowLinkOptions
{ PropagateCompletion = true });
var action3 = new ActionBlock<IEnumerable<string>>(b =>
{
Console.WriteLine("Finised executing a batch");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
action2.LinkTo(action3, new DataflowLinkOptions
{ PropagateCompletion = true });
Task produceTask = Task.Factory.StartNew(() =>
{
foreach (var line in File.ReadLines(filePath))
{
buffer1.Post(line);
}
//Once marked complete your entire data flow will signal a stop for
// all new items
Console.WriteLine("Finished producing");
buffer1.Complete();
});
Task.WaitAll(produceTask);
Console.WriteLine("Produced complete");
action1.Completion.Wait();//Will add all the items to buffer2
Console.WriteLine("Action1 complete");
buffer2.Complete();//will not get any new items
action2.Completion.Wait();//Process the batch of 5 and then complete
Task.Wait(action3.Completion);
Console.WriteLine("Process complete");
Console.ReadLine();
}