В самом TPL Dataflow ничего подобного нет.
Самый простой способ, который я могу себе представить, - это создать структуру, которая инкапсулирует три блока: ввод с высоким приоритетом, ввод с низким приоритетом и вывод. Эти блоки будут простыми BufferBlock
s вместе с методом пересылки сообщений от двух входов к выходу на основе приоритета, работающего в фоновом режиме.
Код может выглядеть так:
public class PriorityBlock<T>
{
private readonly BufferBlock<T> highPriorityTarget;
public ITargetBlock<T> HighPriorityTarget
{
get { return highPriorityTarget; }
}
private readonly BufferBlock<T> lowPriorityTarget;
public ITargetBlock<T> LowPriorityTarget
{
get { return lowPriorityTarget; }
}
private readonly BufferBlock<T> source;
public ISourceBlock<T> Source
{
get { return source; }
}
public PriorityBlock()
{
var options = new DataflowBlockOptions { BoundedCapacity = 1 };
highPriorityTarget = new BufferBlock<T>(options);
lowPriorityTarget = new BufferBlock<T>(options);
source = new BufferBlock<T>(options);
Task.Run(() => ForwardMessages());
}
private async Task ForwardMessages()
{
while (true)
{
await Task.WhenAny(
highPriorityTarget.OutputAvailableAsync(),
lowPriorityTarget.OutputAvailableAsync());
T item;
if (highPriorityTarget.TryReceive(out item))
{
await source.SendAsync(item);
}
else if (lowPriorityTarget.TryReceive(out item))
{
await source.SendAsync(item);
}
else
{
// both input blocks must be completed
source.Complete();
return;
}
}
}
}
Использование будет выглядеть так:
b1.LinkTo(priorityBlock.HighPriorityTarget);
b2.LinkTo(priorityBlock.LowPriorityTarget);
priorityBlock.Source.LinkTo(a);
Чтобы это работало, a
также должен иметь BoundingCapacity
значение 1 (или, по крайней мере, очень маленькое число).
Предостережение, связанное с этим кодом, заключается в том, что он может вызвать задержку двух сообщений (одно ожидает в блоке вывода, другое - в SendAsync()
). Итак, если у вас есть длинный список сообщений с низким приоритетом, и внезапно приходит сообщение с высоким приоритетом, оно будет обработано только после этих двух сообщений с низким приоритетом, которые уже ожидают.
Если это проблема для вас, ее можно решить. Но я считаю, что для этого потребуется более сложный код, который работает с менее общедоступными частями потока данных TPL, например OfferMessage()
.
person
svick
schedule
07.01.2014