TPL DataFlow, связывать блоки с приоритетом?

Можно ли связать два или более источника с одним блоком ITargetBlock (например, ActionBlock) с помощью блоков TPL.DataFlow и установить приоритетность источников?

e.g.

BufferBlock<string> b1 = new ...
BufferBlock<string> b2 = new ...
ActionBlock<string> a = new ...

//somehow force messages in b1 to be processed before any message of b2, always
b1.LinkTo (a);
b2.LinkTo (a);

Пока есть сообщения в b1, я хочу, чтобы они передавались в "a", и как только b1 станет пустым, сообщения b2 будут помещены в "a"

Идеи?


person Roger Johansson    schedule 07.01.2014    source источник


Ответы (1)


В самом TPL Dataflow ничего подобного нет.

Самый простой способ, который я могу себе представить, - это создать структуру, которая инкапсулирует три блока: ввод с высоким приоритетом, ввод с низким приоритетом и вывод. Эти блоки будут простыми BufferBlocks вместе с методом пересылки сообщений от двух входов к выходу на основе приоритета, работающего в фоновом режиме.

Код может выглядеть так:

public class PriorityBlock<T>
{
    private readonly BufferBlock<T> highPriorityTarget;

    public ITargetBlock<T> HighPriorityTarget
    {
        get { return highPriorityTarget; }
    }

    private readonly BufferBlock<T> lowPriorityTarget;

    public ITargetBlock<T> LowPriorityTarget
    {
        get { return lowPriorityTarget; }
    }

    private readonly BufferBlock<T> source;

    public ISourceBlock<T> Source
    {
        get { return source; }
    }

    public PriorityBlock()
    {
        var options = new DataflowBlockOptions { BoundedCapacity = 1 };

        highPriorityTarget = new BufferBlock<T>(options);
        lowPriorityTarget = new BufferBlock<T>(options);
        source = new BufferBlock<T>(options);

        Task.Run(() => ForwardMessages());
    }

    private async Task ForwardMessages()
    {
        while (true)
        {
            await Task.WhenAny(
                highPriorityTarget.OutputAvailableAsync(),
                lowPriorityTarget.OutputAvailableAsync());

            T item;

            if (highPriorityTarget.TryReceive(out item))
            {
                await source.SendAsync(item);
            }
            else if (lowPriorityTarget.TryReceive(out item))
            {
                await source.SendAsync(item);
            }
            else
            {
                // both input blocks must be completed
                source.Complete();
                return;
            }
        }
    }
}

Использование будет выглядеть так:

b1.LinkTo(priorityBlock.HighPriorityTarget);
b2.LinkTo(priorityBlock.LowPriorityTarget);
priorityBlock.Source.LinkTo(a);

Чтобы это работало, a также должен иметь BoundingCapacity значение 1 (или, по крайней мере, очень маленькое число).

Предостережение, связанное с этим кодом, заключается в том, что он может вызвать задержку двух сообщений (одно ожидает в блоке вывода, другое - в SendAsync()). Итак, если у вас есть длинный список сообщений с низким приоритетом, и внезапно приходит сообщение с высоким приоритетом, оно будет обработано только после этих двух сообщений с низким приоритетом, которые уже ожидают.

Если это проблема для вас, ее можно решить. Но я считаю, что для этого потребуется более сложный код, который работает с менее общедоступными частями потока данных TPL, например OfferMessage().

person svick    schedule 07.01.2014
comment
Спасибо, но в моем случае я всегда должен обрабатывать сообщения с высоким приоритетом напрямую. оказывается, что DataFlow в любом случае слишком медленный для моего варианта использования, поэтому прямо сейчас у меня есть управляемое решение. - person Roger Johansson; 17.01.2014