Альтернатива ActionBlock Framework 4 rx

Меня интересует реализация ActionBlock для Framework 4.0, так как кажется, что TPL.Dataflow не поддерживается для Framework 4.0. В частности, меня интересует случай конструктора, который получает делегат Func‹TInput, Task› и случай MaxDegreeOfParallism = 1.

Я думал реализовать это с помощью реактивных расширений, но не знаю, как это сделать. Думал о создании Subject‹TInput› и вызове OnNext на Post, а также об использовании SelectMany и задачи ToObservable, но я не уверен, что делать с планировщиком. Вот набросок того, о чем я думал.

public class ActionBlock<TInput>
{
    private readonly TaskCompletionSource<object> mCompletion = new TaskCompletionSource<object>();
    private readonly Subject<TInput> mQueue = new Subject<TInput>();

    public ActionBlock(Func<TInput, Task> action)
    {
        var observable =
            from item in mQueue
            from _ in action(item).ToObservable()
            select _;

        observable.Subscribe(x => { },
            OnComplete);
    }

    private void OnComplete()
    {
        mCompletion.SetResult(null);
    }

    public void Post(TInput input)
    {
        mQueue.OnNext(input);
    }

    public Task Completion
    {
        get
        {
            return mCompletion.Task;
        }
    }

    public void Complete()
    {
        mQueue.OnCompleted();
    }
}

Я подумал, может быть, с помощью EventLoopScheduler, но я не уверен, что он подходит здесь, так как это асинхронно.

Любые идеи?


person darkl    schedule 24.02.2015    source источник


Ответы (1)


mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Merge(maxDegreeOfParallelism)
    .Subscribe(...);

Если действительно maxDegreeOfParallelism всегда равно 1, то просто используйте Concat вместо Merge:

mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Concat()
    .Subscribe(...);

Это работает, потому что FromAsync просто создает холодный наблюдаемый объект, который не будет запускать асинхронное действие, пока на него не будет подписано. Затем мы используем параметр maxConcurrency для Merge (или просто Concat), чтобы ограничить количество одновременных подписок (и, следовательно, количество выполняемых асинхронных действий).

Редактировать:

А поскольку ваша цель — просто получить Task, обозначающий завершение потока, вы можете использовать ToTask вместо прямой подписки. ToTask подпишется и вернет Task с окончательным значением. Поскольку ToTask вызовет исключение, если наблюдаемый объект не выдаст значение, мы будем использовать Count, чтобы гарантировать, что он выдает значение:

// task to mark completion
private readonly Task mCompletion;

// ...

this.mCompletion = mQueue
    .Select(input => Observable.FromAsync(() => action(input))
    .Concat()
    .Count()
    .ToTask();
person Brandon    schedule 25.02.2015