Как реализовать в TPL непрерывно работающие блоки потока данных?

У меня есть настройка блока потока данных производителя / потребителя с использованием BufferBlock и ActionBlock, и он отлично работает внутри консольного приложения;

После добавления всех элементов в BurfferBlock и связывания BufferBlock с другими элементами действий; он работает хорошо.

теперь я хочу использовать эту внутреннюю службу, в которой этот конвейер блока потока данных всегда будет работать, и когда сообщения будут доступны через внешние события, они войдут в буферный блок и начнут обработку. Как я могу этого добиться?

Пока что я сделал ниже:

public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    }, new ExecutionDataflowBlockOptions
    {
        CancellationToken = CancellationToken
    });

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);
}

person user2757350    schedule 02.12.2013    source источник
comment
Почему бы тебе просто не сделать это? Что вы пробовали и как это не удалось?   -  person svick    schedule 03.12.2013
comment
@svick Я добавил то, что реализовал до сих пор   -  person user2757350    schedule 04.12.2013
comment
Так в чем проблема? Этот код работает так, как вы ожидаете? Что мешает вам публиковать какие-либо события в этом конвейере?   -  person svick    schedule 04.12.2013
comment
Я могу отправить сообщение; как я могу добиться этого без таймера. Я не хочу вызывать Pipeline.Complete (), иначе мне придется заново инициализировать Pipeline; чего я не хочу (потому что я стараюсь, чтобы этот конвейер всегда был открыт)   -  person user2757350    schedule 05.12.2013


Ответы (2)


Размер вашего пакета определяется переменной BoundingCapacity в конструкторе пакетного блока. Пакет будет отправлен, когда:

  • Получено количество сообщений, равное размеру пакета (указано в конструкторе)
  • Пакетный блок отмечен на завершение
  • Метод triggerbatch называется

Похоже, вы хотите, чтобы партия публиковалась, когда размер ванны соблюден или истечет время ожидания. Если это так, и если размер пакета не критичен, я бы просто добавил повторяющийся интервал к таймеру, который у вас есть, и заставил объект, расположенный ниже по потоку от пакетного блока, игнорировать пустые сообщения.

Что вам на самом деле может понадобиться, и что наиболее соответствует философии программирования потока данных, - это создать новый пакетный блок, когда вы начинаете публиковать серию элементов, а затем завершить его, когда это будет сделано или когда произойдет тайм-аут. Новые сообщения создадут новый пакетный блок, если он еще не существует.

Проблема с попыткой реализовать таймер тайм-аута вокруг пакетного блока, который срабатывает только на основе первого триггера, заключается в том, что вам нужно будет либо подсчитывать и проверять сообщения в буферном блоке, либо вам нужно будет просматривать сообщения из буферного блока. Оба этих сценария создадут много уродства и / или нарушат инкапсуляцию блоков.

person VoteCoffee    schedule 16.12.2013
comment
Спасибо за ваши предложения; Я хочу избежать создания пакетного блока каждый раз. Как вы видите, моя программа в основном преобразует болтливые сообщения в блоки с помощью буфера. Buffer отлично работает с BoundingCapacity; в моем случае я установил его на 100. Но я не хочу ждать, пока будут получены все 100 сообщений. Мне нужен двойной контроль над BufferBlock; например если есть 100 сообщений или 5 секунд (оба могут быть настроены). Решения работают в соответствии с моими потребностями, но я хотел посмотреть, есть ли у кого-нибудь лучшие решения. Ключевым моментом здесь является то, что я хочу, чтобы BufferBlock работал в двойном режиме; при попадании в BoundinCapacity и тайм-ауте. - person user2757350; 18.12.2013
comment
Как только вы передаете элемент в блок потока данных, вам нужно перестать знать об этом элементе. Блоки потока данных должны управляться данными. Внешний контроль не одобряется. Я бы честно изменил то, что у вас есть, чтобы «триггер тайм-аута» и пакетный блок были заключены в один IPropagatingBlock. - person VoteCoffee; 18.12.2013
comment
@VeteCoffee - в этом есть смысл; это разделит блоки, и я могу заменить их чем-то другим в будущем ... Спасибо - person user2757350; 18.12.2013
comment
Голосовать против - это нормально, но, пожалуйста, оставьте отзыв, чтобы помочь мне пересмотреть свой ответ. - person VoteCoffee; 08.01.2015

В качестве грубого упрощения DataFlow - это способ обработки группы объектов с помощью набора методов. Он не предоставляет и не ожидает какого-либо конкретного способа создания этих объектов.

Если вы хотите, чтобы конвейер оставался живым, просто не закрывайте приложение. Если вы не хотите использовать консольное приложение, создайте службу, которая строит конвейер и отправляет ему объекты, пока он не закроется.

Сообщения - это просто объекты, которые вы создадите, читая данные, в ответ на события (что бы это ни значило) или любым другим способом.

Что касается внешних событий, что вы имеете в виду? Что кто-то будет отправлять данные в ваше приложение? Это может произойти разными способами:

  • Если данные поступают из другого консольного приложения, вы можете передать результаты одного приложения другому, проанализировать данные, поступающие из входного потока вашего приложения командной строки, создать сообщения и передать их конвейеру.
  • Если вы хотите, чтобы служба прослушивала запросы, вы можете разместить службу .NET Pipe, WCF или веб-API для прослушивания вызовов и передачи опубликованных данных в конвейер.
  • Если данные поступают из базы данных, вы можете запросить изменения и отправить любые измененные данные в конвейер.

Дело в том, что Dataflow - это обработка данных, а не прослушивание событий. Это не полноценная система распределенных агентов, если это то, что вы искали.

person Panagiotis Kanavos    schedule 03.12.2013
comment
Внешние события, т.е. у меня есть обновления уведомлений с другого канала; который я публикую на BufferBlock. У меня есть круглосуточная служба, которая прослушивает эти входящие обновления положения и на основе этого выполняет некоторую обработку в нисходящем потоке ... - person user2757350; 18.12.2013