Реализуйте пакетный процесс с помощью производителя-потребителя

Партия

  1. читать текст из файла или SQL
  2. разобрать текст на слова
  3. загрузить слова в SQL

Сегодня
.NET 4.0
Шаг 1 выполняется очень быстро. Шаги 2 и 3 имеют примерно одинаковую длину (в среднем 0,1 секунды) для файла одинакового размера.
На шаге 3 вставьте с помощью BackGroundWorker и дождитесь завершения последнего.
Все остальное выполняется в основном потоке.

При большой нагрузке сделает это несколько миллионов раз.

Необходимо, чтобы шаг 3 был последовательным и в том же порядке, что и шаг 1.
Это нужно для того, чтобы предотвратить разбиение индекса PK таблицы SQL.
Пробовал выполнять шаг 3 параллельно, и разбиение индекса убило его.
Эти данные Feed отсортирован по PK.
Другие индексы удаляются в начале загрузки, а затем перестраиваются в конце загрузки.

Этот процесс неэффективен, когда размер текста меняется.
И размер текста от файла к файлу действительно резко меняется.
Я бы хотел поставить в очередь 1 и 2, чтобы 3 оставался занятым. насколько это возможно.

Нужен шаг 3, чтобы удалить файлы из очереди, чтобы они были поставлены в очередь в 1 (даже если он ждет).

Нужен максимальный размер очереди для управления памятью (например, 4-10).

Хотел бы иметь шаг 2 параллельно с до 4 одновременных.

Переход на .NET 4.5.

Просить общего руководства о том, как это реализовать?

Я узнаю, что это шаблон производителя-потребителя.
Если это не шаблон производителя-потребителя, сообщите мне, чтобы я мог изменить название.


person paparazzo    schedule 16.10.2012    source источник
comment
Вы пробовали просто использовать files.AsParallel().AsOrdered()?   -  person Dax Fohl    schedule 16.10.2012
comment
@DaxFohl Шаг 1 уже очень быстрый. И текст может быть из SQL или файла.   -  person paparazzo    schedule 16.10.2012


Ответы (1)


Я думаю, что TPL Dataflow был бы хорошим способом сделать это:

Для шага 2 вы должны использовать TransformBlock с MaxDegreeOfParallelism, установленным на 4, и BoundedCapacity, также установленным на 4, чтобы его очереди были пусты при работе. Он будет производить предметы в том же порядке, в котором они были получены, вам не нужно делать для этого ничего особенного. Для шага 3 используйте ActionBlock, а BoundedCapacity установите на свой предел. Затем свяжите их вместе и начните отправлять элементы в TransformBlock, в идеале используя что-то вроде await stepTwoBlock.SendAsync(…), чтобы асинхронно ожидать заполнения очереди.

В коде это будет выглядеть примерно так:

async Task ProcessData()
{
    var stepTwoBlock = new TransformBlock<OriginalText, ParsedText>(
        text => Parse(text),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
            BoundedCapacity = 4
        });
    var stepThreeBlock = new ActionBlock<ParsedText>(
        text => LoadIntoDatabase(text),
        new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
    stepTwoBlock.LinkTo(
        stepThreeBlock, new DataflowLinkOptions { PropagateCompletion = true });

    // this is step one:
    foreach (var id in IdsToProcess)
    {
        OriginalText text = ReadText(id);
        await stepTwoBlock.SendAsync(text);
    }

    stepTwoBlock.Complete();
    await stepThreeBlock.Completion;
}
person svick    schedule 16.10.2012
comment
Спасибо, мне нужно время, чтобы проверить. StepTwoBlock принимает 4 за раз, а затем, когда все четыре будут завершены, он захватит следующие 4? Так это займет столько времени, сколько самый длинный из 4? Если это держит их в порядке, то оно того стоит. - person paparazzo; 16.10.2012
comment
Это не работает именно так. Если первый элемент (с наименьшим идентификатором) из тех, что в настоящее время находятся на шаге 2, завершится первым, то сразу же начнется обработка нового элемента (при условии, что в блоке шага 3 есть свободное место). Но то, что вы описываете, может случиться, если вам не повезет, потому что блок всегда будет хранить не более 4 элементов, даже если он пока ничего не может сделать с некоторыми из них. Если это окажется проблемой, вы можете попытаться увеличить емкость блока шага 2 (и, возможно, уменьшить емкость блока шага 3, чтобы сохранить использование памяти прежним). - person svick; 17.10.2012
comment
Спасибо. Извините, но не могу дать вам чек, пока я не проверю, и это то, что я хочу узнать, поэтому это займет у меня немного времени. - person paparazzo; 17.10.2012