Параллельный C #: стоит ли использовать много AutoResetEvent?

Предположим, что есть много потоков, вызывающих Do(), и только один рабочий поток обрабатывает фактическую работу.

void Do(Job job)
{
    concurrentQueue.Enqueue(job);
    // wait for job done
}

void workerThread()
{
    while (true)
    {
        Job job;
        if (concurrentQueue.TryDequeue(out job))
        {
            // do job
        }
    }
}

Do() должен дождаться завершения работы перед возвратом. Поэтому я написал следующий код:

class Task 
{
    public Job job;
    public AutoResetEvent ev;
}

void Do(Job job)
{
    using (var ev = new AutoResetEvent(false))
    {
        concurrentQueue.Enqueue(new Task { job = job, ev = ev }));
        ev.WaitOne();
    }
}

void workerThread()
{
    while (true)
    {
        Task task;
        if (concurrentQueue.TryDequeue(out task))
        {
            // do job
            task.ev.Set();
        }
    }
}

После некоторых тестов я обнаружил, что он работает так, как ожидалось. Однако я не уверен, что это хороший способ выделить много AutoResetEvents или есть лучший способ сделать это?


person Cauly    schedule 23.04.2019    source источник
comment
При этом вы создаете несколько объектов синхронизации на уровне ОС, что, вероятно, не очень эффективно. В качестве альтернативы вы можете использовать TaskCompletionSource для отображения объектов TPL Task вместо ваших пользовательских Task, что даст возможность дождаться завершения задания в качестве дополнительного преимущества.   -  person Dmytro Mukalov    schedule 23.04.2019
comment
Если Do должен ждать завершения задания, то зачем запускать его в отдельном рабочем потоке?   -  person Sean    schedule 23.04.2019
comment
У вас есть только один рабочий поток, поэтому задание все равно будет ждать...   -  person Johnny    schedule 23.04.2019
comment
Поскольку вы используете WaitOne только один раз перед удалением примитива, не имеет значения, используете ли вы AutoResetEvent или ManualResetEvent. Так почему бы не использовать более легкий вариант ManualResetEventSlim. и эффективный?   -  person Theodor Zoulias    schedule 23.04.2019
comment
@TheodorZoulias, спасибо за совет. После ilspy в ManualResetEventSlim я обнаружил, что, как указано в msdoc. Однако, если о событии не сообщается в течение определенного периода времени, ManualResetEventSlim прибегает к обычному ожиданию дескриптора события. определенный период времени составляет всего дюжину фор-урожая, что для меня слишком мало. Итак, на самом деле дело доходит до того, как ManualResetEvent сравнивается с TaskCompletionSource?   -  person Cauly    schedule 23.04.2019
comment
@Cauly, ты прав. В вашем случае использование ManualResetEventSlim не будет иметь большого значения, поскольку ожидается, что время ожидания будет долгим. Кажется, что ваш дизайн в любом случае не сосредоточен на производительности. У вас может быть какая-то конкретная причина (сходство с потоком?), чтобы заставить все задания выполняться определенным потоком.   -  person Theodor Zoulias    schedule 23.04.2019
comment
@TheodorZoulias да, работа связана с компонентом com, который должен управляться только одним потоком.   -  person Cauly    schedule 23.04.2019
comment
Теперь это имеет смысл. Я опубликую ответ, используя TaskCompletionSource.   -  person Theodor Zoulias    schedule 23.04.2019


Ответы (3)


Поскольку все клиенты должны ожидать выполнения задания в одном потоке, нет реальной необходимости в использовании очереди. Поэтому я предлагаю вместо этого использовать класс Monitor, и, в частности, функции Wait/Pulse. Хотя это немного низкоуровнево и многословно.

class Worker<TResult> : IDisposable
{
    private readonly object _outerLock = new object();
    private readonly object _innerLock = new object();
    private Func<TResult> _currentJob;
    private TResult _currentResult;
    private Exception _currentException;
    private bool _disposed;

    public Worker()
    {
        var thread = new Thread(MainLoop);
        thread.IsBackground = true;
        thread.Start();
    }

    private void MainLoop()
    {
        lock (_innerLock)
        {
            while (true)
            {
                Monitor.Wait(_innerLock); // Wait for client requests
                if (_disposed) break;
                try
                {
                    _currentResult = _currentJob.Invoke();
                    _currentException = null;
                }
                catch (Exception ex)
                {
                    _currentException = ex;
                    _currentResult = default;
                }
                Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done
            }
        } // We are done
    }

    public TResult DoWork(Func<TResult> job)
    {
        TResult result;
        Exception exception;
        lock (_outerLock) // Accept only one client at a time
        {
            lock (_innerLock) // Acquire inner lock
            {
                if (_disposed) throw new InvalidOperationException();
                _currentJob = job;
                Monitor.Pulse(_innerLock); // Notify worker thread about the new job
                Monitor.Wait(_innerLock); // Wait for worker thread to process the job
                result = _currentResult;
                exception = _currentException;
                // Clean up
                _currentJob = null;
                _currentResult = default;
                _currentException = null;
            }
        }
        // Throw the exception, if occurred, preserving the stack trace
        if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();
        return result;
    }

    public void Dispose()
    {
        lock (_outerLock)
        {
            lock (_innerLock)
            {
                _disposed = true;
                Monitor.Pulse(_innerLock); // Notify worker thread to exit loop
            }
        }
    }
}

Пример использования:

var worker = new Worker<int>();
int result = worker.DoWork(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();

Выход:

Result: 1

Обновление. Предыдущее решение не поддерживает ожидание, поэтому вот решение, которое допускает надлежащее ожидание. Он использует TaskCompletionSource для каждого задания, хранящегося в BlockingCollection.

class Worker<TResult> : IDisposable
{
    private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection
        = new BlockingCollection<TaskCompletionSource<TResult>>();

    public Worker()
    {
        var thread = new Thread(MainLoop);
        thread.IsBackground = true;
        thread.Start();
    }

    private void MainLoop()
    {
        foreach (var tcs in _blockingCollection.GetConsumingEnumerable())
        {
            var job = (Func<TResult>)tcs.Task.AsyncState;
            try
            {
                var result = job.Invoke();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);
            }
        }
    }

    public Task<TResult> DoWorkAsync(Func<TResult> job)
    {
        var tcs = new TaskCompletionSource<TResult>(job,
            TaskCreationOptions.RunContinuationsAsynchronously);
        _blockingCollection.Add(tcs);
        return tcs.Task;
    }

    public TResult DoWork(Func<TResult> job) // Synchronous call
    {
        var task = DoWorkAsync(job);
        try { task.Wait(); } catch { } // Swallow the AggregateException
        // Throw the original exception, if occurred, preserving the stack trace
        if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();
        return task.Result;
    }

    public void Dispose()
    {
        _blockingCollection.CompleteAdding();
    }
}

Пример использования

var worker = new Worker<int>();
int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument
Console.WriteLine($"Result: {result}");
worker.Dispose();

Выход:

Result: 1
person Theodor Zoulias    schedule 23.04.2019

С точки зрения синхронизации это работает нормально.

Но кажется бесполезным делать это таким образом. Если вы хотите выполнять задания одно за другим, вы можете просто использовать блокировку:

lock (lockObject) {
  RunJob();
}

Каковы ваши намерения с этим кодом?

Также возникает вопрос эффективности, потому что каждая задача создает событие ОС и ожидает его. Если вы используете более современный TaskCompletionSource, он будет использовать то же самое под капотом, если вы синхронно ожидаете этой задачи. Вы можете использовать асинхронное ожидание (await myTCS.Task;), чтобы немного повысить эффективность. Конечно, это заражает весь стек вызовов с помощью async/await. Если это операция с довольно небольшим объемом, вы не выиграете много.

person usr    schedule 23.04.2019
comment
Я не могу использовать блокировку, потому что RunJob() имеет дело с компонентом com, который должен управляться только одним потоком. - person Cauly; 23.04.2019
comment
@Cauly, это веская причина, чтобы сделать это таким образом. У меня не было бы серьезной критики вашего кода. Может быть, проще использовать собственный TaskScheduler. Таким образом, каждый рабочий элемент представлен в виде задачи, которая хорошо составлена. Вы можете ждать и ждать его. - person usr; 24.04.2019

В целом, я думаю, это сработает, хотя, когда вы говорите, что "многие" потоки вызывают Do(), это может плохо масштабироваться... приостановленные потоки используют ресурсы.

Другая проблема с этим кодом заключается в том, что во время простоя у вас будет «жесткий цикл» в «workerThread», который заставит ваше приложение возвращать время высокой загрузки ЦП. Вы можете добавить этот код в "workerThread":

if (concurrentQueue.IsEmpty) Thread.Sleep(1);

Вы также можете ввести тайм-аут для вызова WaitOne, чтобы избежать застревания в журнале.

person James Harcourt    schedule 23.04.2019
comment
Спасибо за совет, рассматриваемый код упрощен для демонстрации ^_^ - person Cauly; 23.04.2019