Правильный способ фильтрации BlockBuffer.RecieveAsync

Добрый день.

У меня есть сетка потока данных TPL для вызовов rpc

Он имеет два несвязанных потока, которые в упрощенном виде выглядят так:

Выходной поток:

  • BlockBuffer для хранения вывода
  • ActionBlock для отправки вывода на сервер и создания отправленного идентификатора

И входной поток:

  • цикл while для получения данных
  • TransformBlock для анализа данных
  • BlockBuffer для сохранения ответа с помощью sentid

есть проблема: когда я звоню из отдельных потоков, я могу путаться с ответами, поэтому мне нужно его отфильтровать.

мой вызов rpc:

public async Task<RpcAnswer> PerformRpcCall(Call rpccall)
{
    ...
    _outputRpcCalls.Post(rpccall);
    long uniqueId = GetUniq(); // call unique id
    ...
    var sent = new Tuple<long, long>(uniqueId, 0);
    while (_sentRpcCalls.TryReceive(u => u.Item1 == uniqueId, out sent)) ; // get generated id from send function

    return await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(30));
}

как видите, у меня есть уникальный идентификатор, который может помочь мне определить ответ на этот вызов, но как я могу отфильтровать его и дождаться его?

Это хороший способ иметь некоторый массив буферов (может быть, WriteOnceBlock?), Который будет создан в вызове rpc и LinkedTo с фильтром?


person xakpc    schedule 24.08.2013    source источник


Ответы (2)


Хорошо, я не нашел правильного способа, поэтому я сделал грязный обходной путь

while (true)
{
    answer = await _inputAnswers.ReceiveAsync(TimeSpan.FromSeconds(5)); 

    if (answer.Success)
    {
        if (answer.Answer.Combinator.ValueType.Equals(rpccall.Combinator.ValueType))
        {
            break;
        }
        else
        {
            // wrong answer - post it back
            _inputAnswers.Post(answer.Answer);
        }

    }
    else
    {
        // answer fail - return it
        break;
    }
}
person xakpc    schedule 25.08.2013

Один из способов сделать это — создать новый блок для каждого идентификатора и связать его с блоком ответов с помощью предиката, проверяющего идентификатор и MaxMessages, установленного в 1:

Task<Answer> ReceiveAnswerAsync(int uniqueId)
{
    var block = new BufferBlock<Answer>();

    _inputAnswers.LinkTo(
        block,
        new DataflowLinkOptions { MaxMessages = 1, PropagateCompletion = true },
        answer => answer.Id == uniqueId);

    return block.ReceiveAsync();
}
person svick    schedule 30.08.2013