У меня есть настройка блока потока данных производителя / потребителя с использованием BufferBlock и ActionBlock, и он отлично работает внутри консольного приложения;
После добавления всех элементов в BurfferBlock и связывания BufferBlock с другими элементами действий; он работает хорошо.
теперь я хочу использовать эту внутреннюю службу, в которой этот конвейер блока потока данных всегда будет работать, и когда сообщения будут доступны через внешние события, они войдут в буферный блок и начнут обработку. Как я могу этого добиться?
Пока что я сделал ниже:
public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
BufferBlock = new BufferBlock<WorkItem>();
GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);
ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);
TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite);
logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
return WorkItem;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
BatchBlock.LinkTo(ProcessItems);
TimingBlock.LinkTo(BatchBlock);
BufferBlock.LinkTo(TimingBlock);
}