Как выполнять асинхронные операции в потоке данных TPL для лучшей производительности?

Я написал следующий метод для пакетной обработки огромного CSV-файла. Идея состоит в том, чтобы прочитать часть строк из файла в память, а затем разделить эту часть строк на пакеты фиксированного размера. Как только мы получим разделы, отправьте эти разделы на сервер (синхронный или асинхронный), что может занять некоторое время.

private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
    List<string> chunk = new List<string>(chunkSize);

    foreach (var line in File.ReadLines(filePath))
    {
        if (chunk.Count == chunk.Capacity)
        {
            // Partition each chunk into smaller chunks grouped on column 1
            var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);

            // Further breakdown the chunks into batch size groups
            var groups = partitions.Select(x => x.Select((i, index) =>
                new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

            // Get batches from groups
            var batches = groups.SelectMany(x => x)
                .Select(y => y.Select(z => z)).ToList();

            // Process all batches asynchronously
            batches.AsParallel().ForAll(async b =>
            {
                WebClient client = new WebClient();
                byte[] bytes = System.Text.Encoding.ASCII
                    .GetBytes(b.SelectMany(x => x).ToString());
                await client.UploadDataTaskAsync("myserver.com", bytes);
            });

            // clear the chunk
            chunk.Clear();
        }

        chunk.Add(line);
    }
}

Этот фрагмент кода не кажется очень эффективным по двум причинам.

  1. Основной поток, читающий CSV-файл, блокируется до тех пор, пока не будут обработаны все разделы.

  2. AsParallel блокируется до завершения всех задач. Поэтому, если в пуле потоков доступно больше потоков для выполнения работы, я не использую их, потому что ни одно задание не связано ни с одним из разделов.

Размер партии фиксирован, поэтому его нельзя изменить, но размер куска можно настроить для повышения производительности. Я могу выбрать достаточно большое значение chunkSize, чтобы ни один из созданных пакетов не был >> ни одним потоком, доступным в системе, но это по-прежнему означает, что метод Parallel.ForEach блокируется до тех пор, пока все задачи не будут выполнены.

Как я могу изменить код так, чтобы все доступные потоки в системе использовались для выполнения работы без простоя. Я думаю, что мог бы использовать BlockingCollection для хранения пакетов, но не уверен, какой размер емкости дать ему, поскольку ни один из пакетов не является динамическим в каждом фрагменте.

Любые идеи о том, как использовать TPL для максимального использования потока, чтобы большинство доступных потоков в системе всегда выполняли какие-то действия?

ОБНОВЛЕНИЕ: вот что я получил, используя поток данных TPL. Это правильно?

private static void UploadData(string filePath, int chunkSize, int batchSize)
{
    var buffer1 = new BatchBlock<string>(chunkSize);
    var buffer2 = new BufferBlock<IEnumerable<string>>();

    var action1 = new ActionBlock<string[]>(t =>
    {
        Console.WriteLine("Got a chunk of lines " + t.Count());

        // Partition each chunk into smaller chunks grouped on column 1
        var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);

        // Further breakdown the chunks into batch size groups
        var groups = partitions.Select(x => x.Select((i, index) =>
            new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

        // Get batches from groups
        var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

        foreach (var batch in batches)
        {
            buffer2.Post(batch);
        }

    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    buffer1.LinkTo(action1, new DataflowLinkOptions
        { PropagateCompletion = true });

    var action2 = new TransformBlock<IEnumerable<string>,
        IEnumerable<string>>(async b =>
    {
        await ExecuteBatch(b);
        return b;

    }, new ExecutionDataflowBlockOptions
        { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    buffer2.LinkTo(action2, new DataflowLinkOptions
        { PropagateCompletion = true });

    var action3 = new ActionBlock<IEnumerable<string>>(b =>
    {
        Console.WriteLine("Finised executing a batch");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

    action2.LinkTo(action3, new DataflowLinkOptions
        { PropagateCompletion = true });

    Task produceTask = Task.Factory.StartNew(() =>
    {
        foreach (var line in File.ReadLines(filePath))
        {
            buffer1.Post(line);
        }

        //Once marked complete your entire data flow will signal a stop for
        // all new items
        Console.WriteLine("Finished producing");
        buffer1.Complete();
    });

    Task.WaitAll(produceTask);
    Console.WriteLine("Produced complete");

    action1.Completion.Wait();//Will add all the items to buffer2
    Console.WriteLine("Action1 complete");

    buffer2.Complete();//will not get any new items
    action2.Completion.Wait();//Process the batch of 5 and then complete

    Task.Wait(action3.Completion);

    Console.WriteLine("Process complete");
    Console.ReadLine();
}

person user330612    schedule 16.08.2015    source источник
comment
Как вы сказали, вы должны использовать поток данных TPL. У вас есть конкретный вопрос по этому поводу?   -  person i3arnon    schedule 16.08.2015
comment
AsParallel блокируется до тех пор, пока все задачи не будут завершены. Это не так, потому что ваша лямбда возвращается немедленно, потому что она асинхронна. Это ошибка.   -  person usr    schedule 16.08.2015
comment
Ok. Итак, как это исправить?   -  person user330612    schedule 16.08.2015
comment
@ i3arnon Я не знаком с потоком данных. Глядя на документацию, не очень понятно, какие блоки использовать в потоке данных. Я предполагаю, что мне нужен пакетный блок chunkSize для хранения куска строк. А затем блок преобразования для преобразования фрагментов в пакеты. Я не понимаю, как моделируется первая часть потока, т. е. чтение CSv-файла по частям... и работает последний шаг ожидания завершения всех асинхронных задач.   -  person user330612    schedule 16.08.2015


Ответы (1)


Вы были рядом, в TPL данные перетекают из одного блока в другой, и вы должны стараться придерживаться этой парадигмы. Так, например, action1 должен быть TransformManyBlock, потому что ActionBlock является ITargetBlock (т. е. завершающим блоком).

Когда вы указываете завершение распространения для ссылки, событие Complete автоматически направляется через блок, поэтому вам нужно только выполнить один вызов wait() в последнем блоке.

Думайте об этом как о цепочке домино, вы называете завершенным первый блок, и он будет распространяться по цепочке до последнего блока.

Вы также должны учитывать, что и почему вы используете многопоточность; ваш пример сильно связан с вводом-выводом, и я не думаю, что привязка кучи потоков для ожидания завершения ввода-вывода является правильным решением.

Наконец, обратите внимание на то, что блокирует или нет. В вашем примере buffer1.Post(...) не является блокирующим вызовом, у вас нет причин использовать его в задаче.

Я написал следующий пример кода, который использует TPL DataFlow:

static void Main(string[] args)
{
    var filePath = "C:\\test.csv";
    var chunkSize = 1024;
    var batchSize = 128;

    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var uploadData = new ActionBlock<IEnumerable<string>>(
        async (data) =>
        {
            WebClient client = new WebClient();
            var payload = data.SelectMany(x => x).ToArray();
            byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload);
            //await client.UploadDataTaskAsync("myserver.com", bytes);
            await Task.Delay(2000);
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ });

    var lineBuffer = new BatchBlock<string>(chunkSize);

    var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
        (data) =>
        {
            // Partition each chunk into smaller chunks grouped on column 1
            var partitions = data.GroupBy(c => c.Split(',')[0]);

            // Further beakdown the chunks into batch size groups
            var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));

            // Get batches from groups
            var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));

            // Don't forget to enumerate before returning
            return batches.ToList();
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    lineBuffer.LinkTo(splitData, linkCompletion);
    splitData.LinkTo(uploadData, linkCompletion);

    foreach (var line in File.ReadLines(filePath))
    {
        lineBuffer.Post(line);
    }
    lineBuffer.Complete();

    // Wait for uploads to finish
    uploadData.Completion.Wait();
}
person Julien Lebot    schedule 23.08.2015