Блоки потока данных TPL внутри дуплекса WCF

Я новичок в SO, пожалуйста, потерпите меня.

У меня есть служба WCF с контрактом на дуплексную службу. Этот сервисный контракт имеет операционный контакт, который предполагает длительную обработку данных. Я вынужден ограничить количество одновременных обработок данных, скажем, максимум 3. Моя проблема в том, что после обработки данных мне нужно вернуться к тому же контексту экземпляра службы, поэтому я перезваниваю своей конечной точке инициатора, передавая результат обработки данных. Я должен упомянуть, что по разным причинам я ограничен потоками данных TPL и дуплексом WCF.

Вот демонстрация того, что я написал до сих пор

В консольной библиотеке я имитирую вызовы WCF

class Program
{
    static void Main(string[] args)
    {
        // simulate service calls

        Enumerable.Range(0, 5).ToList().ForEach(x =>
        {
            new System.Threading.Thread(new ThreadStart(async () =>
            {
                var service = new Service();
                await service.Inc(x);
            })).Start();
        });
    }
}

Вот что должно быть службой WCF

// service contract
public class Service
{
    static TransformBlock<Message<int>, Message<int>> transformBlock;

    static Service()
    {
        transformBlock = new TransformBlock<Message<int>, Message<int>>(x => Inc(x), new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 3
        });
    }

    static Message<int> Inc(Message<int> input)
    {
        System.Threading.Thread.Sleep(100);

        return new Message<int> { Token = input.Token, Data = input.Data + 1 };
    }

    // operation contract
    public async Task Inc(int id)
    {
        var token = Guid.NewGuid().ToString();

        transformBlock.Post(new Message<int> { Token = token, Data = id });

        while (await transformBlock.OutputAvailableAsync())
        {
            Message<int> message;
            if (transformBlock.TryReceive(m => m.Token == token, out message))
            {
                // do further processing using initiator service instance members
                // something like Callback.IncResult(m.Data);
                break;
            }
        }
    }
}

public class Message<T>
{
    public string Token { get; set; }

    public T Data { get; set; }
}

Контракт операции на самом деле не обязательно должен быть асинхронным, но мне нужно уведомление OutputAvailableAsync.

Это хороший подход или есть лучшее решение для моего сценария?

Заранее спасибо.


person uni3324    schedule 21.03.2013    source источник
comment
«Я ограничен потоками данных TPL». Что вы имеете в виду? Что вы должны использовать поток данных? Почему? Это не имеет особого смысла.   -  person svick    schedule 22.03.2013
comment
Мое требование к обработке данных — обеспечить параллелизм, но ограничить параллелизм. Блоки потока данных TPL кажутся хорошим выбором, плюс они навязываются техническими требованиями, а не PLinq или чем-то еще.   -  person uni3324    schedule 22.03.2013
comment
Если это действительно все, для чего вы будете использовать поток данных, то я думаю, что это излишество. Вы можете добиться того же эффекта с помощью более простого кода (см. мой ответ). Кроме того, я не уверен, что дуплексный сервис является правильным выбором и здесь. И клиент, и сервер могут быть асинхронными и без того.   -  person svick    schedule 22.03.2013
comment
Мне сказали, что поток данных может превратиться в цепочку блоков, и в этом случае я должен распространять TCS, я полагаю. Тем не менее, семафор выглядит элегантно. Насчет дуплекса полностью согласен. Спасибо   -  person uni3324    schedule 22.03.2013


Ответы (1)


Во-первых, я думаю, что вы не должны использовать токен так, как вы это делаете. Уникальные идентификаторы полезны при обмене данными между процессами. Но когда вы находитесь внутри одного процесса, просто используйте ссылочное равенство.

Чтобы на самом деле ответить на ваш вопрос, я думаю, что (своего рода) занятая петля - не очень хорошая идея.

Более простым решением для асинхронного регулирования будет использование SemaphoreSlim. . Что-то типа:

static readonly SemaphoreSlim Semaphore = new SemaphoreSlim(3);

// operation contract
public async Task Inc(int id)
{
    await Semaphore.WaitAsync();

    try
    {
        Thread.Sleep(100);
        var result = id + 1;
        // do further processing using initiator service instance members
        // something like Callback.IncResult(result);
    }
    finally
    {
        Semaphore.Release();
    }
}

Если вы действительно хотите (или должны?) использовать поток данных, вы можете использовать TaskCompletionSource< /a> для синхронизации между операцией и блоком. Метод операции будет ожидать Task из TaskCompletionSource, и блок установит его, когда закончит вычисления для этого сообщения:

private static readonly ActionBlock<Message<int>> Block =
    new ActionBlock<Message<int>>(
        x => Inc(x),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 3
        });

static void Inc(Message<int> input)
{
    Thread.Sleep(100);

    input.TCS.SetResult(input.Data + 1);
}

// operation contract
public async Task Inc(int id)
{
    var tcs = new TaskCompletionSource<int>();

    Block.Post(new Message<int> { TCS = tcs, Data = id });

    int result = await tcs.Task;
    // do further processing using initiator service instance members
    // something like Callback.IncResult(result);
}
person svick    schedule 21.03.2013