C # Events: как обрабатывать событие параллельно

У меня есть событие, которое я хотел бы обрабатывать параллельно. Моя идея состоит в том, чтобы добавить каждый обратный вызов в ThreadPool, эффективно имея каждый метод, который зарегистрировал событие, обрабатываемое ThreadPool.

Мой пробный код выглядит примерно так:

Delegate[] delegates = myEvent.GetInvocationList();
IAsyncResult[] results = new IAsyncResult[ delegates.Count<Delegate>() ];

for ( int i = 0; i < delegates.Count<Delegate>(); i++ )
{
    IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null );
    results[ i ] = result;
}

for ( int i = 0; i < delegates.Length; i++ )
{
    ( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] );
}

Это просто для игры, потому что мне любопытно, как это сделать. Я уверен, что есть способ лучше. Мне не нравится Func, который создает WaitCallback, содержащий лямбду. Кроме того, DynamicInvoke работает довольно медленно по сравнению с прямым вызовом делегата. Я сомневаюсь, что такой способ обработки события быстрее, чем просто выполнение его последовательно.

У меня вопрос: как я могу обработать событие параллельно, желательно с помощью ThreadPool?

Поскольку я обычно работаю с Mono, .NET 4.0 или параллельная библиотека задач не подходят.

Спасибо!

РЕДАКТИРОВАТЬ: - Исправленный пример благодаря ответу Earwickers. - Обновлен пробный код


person galaktor    schedule 04.10.2009    source источник
comment
Вы должны обязательно вызывать EndInvoke также для каждого BeginInvoke, чтобы избежать утечки ресурсов. См. Также msdn.microsoft.com/en-us/magazine/cc164036. aspx # S3   -  person Lucero    schedule 04.10.2009
comment
А об использовании свойства AsyncWaitHandle: msdn .microsoft.com / en-us / library / Дескриптор ожидания не закрывается автоматически, когда вы вызываете EndInvoke для делегата, который использовался для вызова асинхронного метода. Если вы освободите все ссылки на дескриптор ожидания, системные ресурсы будут освобождены, когда сборщик мусора освободит дескриптор ожидания. Чтобы освободить системные ресурсы, как только вы закончите использовать дескриптор ожидания, вызовите метод WaitHandle.Close. Ой.   -  person Lucero    schedule 04.10.2009


Ответы (4)


Я бы выбрал подход с использованием DynamicMethod (LCG) и объекта состояния, который несет аргументы и отслеживает вызовы (чтобы вы могли дождаться их завершения).

Код: Должно быть что-то вроде этого (хотя еще не тщательно протестировано, поэтому в некоторых ситуациях может вызывать неприятные исключения):

/// <summary>
/// Class for dynamic parallel invoking of a MulticastDelegate.
/// (C) 2009 Arsène von Wyss, [email protected]
/// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used.
/// </summary>
public static class ParallelInvoke {
    private class ParallelInvokeContext<TDelegate> where TDelegate: class {
        private static readonly DynamicMethod invoker;
        private static readonly Type[] parameterTypes;

        static ParallelInvokeContext() {
            if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) {
                throw new InvalidOperationException("The TDelegate type must be a delegate");
            }
            Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()");
            Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()");
            Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()");
            FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext<TDelegate>).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls");
            FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext<TDelegate>).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic);
            Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments");
            MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public);
            Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName));
            if (delegate_invoke.ReturnType != typeof(void)) {
                throw new InvalidOperationException("The TDelegate delegate must not have a return value");
            }
            ParameterInfo[] parameters = delegate_invoke.GetParameters();
            parameterTypes = new Type[parameters.Length];
            invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext<TDelegate>), typeof(object)},
                                        typeof(ParallelInvokeContext<TDelegate>), true);
            ILGenerator il = invoker.GetILGenerator();
            LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null;
            bool skipLoad = false;
            il.BeginExceptionBlock();
            il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke
            if (args != null) {
                Debug.Assert(args.LocalIndex == 0);
                il.Emit(OpCodes.Ldarg_0);
                il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                il.Emit(OpCodes.Dup);
                il.Emit(OpCodes.Stloc_0);
                skipLoad = true;
            }
            foreach (ParameterInfo parameter in parameters) {
                if (parameter.ParameterType.IsByRef) {
                    throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters");
                }
                parameterTypes[parameter.Position] = parameter.ParameterType;
                if (args == null) {
                    il.Emit(OpCodes.Ldarg_0);
                    il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments);
                } else if (skipLoad) {
                    skipLoad = false;
                } else {
                    il.Emit(OpCodes.Ldloc_0);
                }
                il.Emit(OpCodes.Ldc_I4, parameter.Position);
                il.Emit(OpCodes.Ldelem_Ref);
                if (parameter.ParameterType.IsValueType) {
                    il.Emit(OpCodes.Unbox_Any, parameter.ParameterType);
                }
            }
            il.Emit(OpCodes.Callvirt, delegate_invoke);
            il.BeginFinallyBlock();
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_enter);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Dup);
            il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldc_I4_1);
            il.Emit(OpCodes.Sub);
            il.Emit(OpCodes.Dup);
            Label noPulse = il.DefineLabel();
            il.Emit(OpCodes.Brtrue, noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_pulse);
            Label exit = il.DefineLabel();
            il.Emit(OpCodes.Br, exit);
            il.MarkLabel(noPulse);
            il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls);
            il.MarkLabel(exit);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, monitor_exit);
            il.EndExceptionBlock();
            il.Emit(OpCodes.Ret);
        }

        [Conditional("DEBUG")]
        private static void VerifyArgumentsDebug(object[] args) {
            for (int i = 0; i < parameterTypes.Length; i++) {
                if (args[i] == null) {
                    if (parameterTypes[i].IsValueType) {
                        throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i));
                    }
                } else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) {
                    throw new ArgumentException(string.Format("The parameter {0} is not compatible", i));
                }
            }
        }

        private readonly object[] arguments;
        private readonly WaitCallback invokeCallback;
        private int activeCalls;

        public ParallelInvokeContext(object[] args) {
            if (parameterTypes.Length > 0) {
                if (args == null) {
                    throw new ArgumentNullException("args");
                }
                if (args.Length != parameterTypes.Length) {
                    throw new ArgumentException("The parameter count does not match");
                }
                VerifyArgumentsDebug(args);
                arguments = args;
            } else if ((args != null) && (args.Length > 0)) {
                throw new ArgumentException("This delegate does not expect any parameters");
            }
            invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this);
        }

        public void QueueInvoke(Delegate @delegate) {
            Debug.Assert(@delegate is TDelegate);
            activeCalls++;
            ThreadPool.QueueUserWorkItem(invokeCallback, @delegate);
        }
    }

    private static readonly MethodInfo monitor_enter;
    private static readonly MethodInfo monitor_exit;
    private static readonly MethodInfo monitor_pulse;

    static ParallelInvoke() {
        monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
        monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate) where TDelegate: class {
        Invoke(@delegate, null);
    }

    public static void Invoke<TDelegate>(TDelegate @delegate, params object[] args) where TDelegate: class {
        if (@delegate == null) {
            throw new ArgumentNullException("delegate");
        }
        ParallelInvokeContext<TDelegate> context = new ParallelInvokeContext<TDelegate>(args);
        lock (context) {
            foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) {
                context.QueueInvoke(invocationDelegate);
            }
            Monitor.Wait(context);
        }
    }
}

Использование:

ParallelInvoke.Invoke(yourDelegate, arguments);

Примечания:

  • Исключения в обработчиках событий не обрабатываются (но код IL должен, наконец, уменьшить счетчик, чтобы метод завершился правильно), и это может вызвать проблемы. Можно было бы также перехватывать и передавать исключения в коде IL.

  • Неявные преобразования, отличные от наследования (например, int в double), не выполняются и вызовут исключение.

  • Используемый метод синхронизации не выделяет дескрипторы ожидания ОС, что обычно хорошо для производительности. Описание работы монитора можно найти на странице Джозефа Альбахари.

  • После некоторого тестирования производительности кажется, что этот подход масштабируется намного лучше, чем любой подход, использующий «родные» вызовы BeginInvoke / EndInvoke для делегатов (по крайней мере, в MS CLR).

person Lucero    schedule 04.10.2009
comment
Это будет очень интересно, с нетерпением жду вашего образца. - person galaktor; 04.10.2009
comment
Отправил код, но я сделал только быстрый тест. Хотя вроде работает. - person Lucero; 04.10.2009
comment
Спасибо за код! Кажется, он работает, и, к счастью, его намного проще использовать, чем читать ;-) Я провел небольшой тест производительности, сравнив ваш код с прямым вызовом события, а затем с методом, который я опубликовал в вопросе. Результаты были ... ParallelInvoke: 86 мс, прямой вызов: 21 мс, моя попытка: 36 мс. После нескольких прогонов пропорции, кажется, остались прежними. Будет ли что-то явно неправильное в том, чтобы сохранить мою версию? Я могу предоставить свой тестовый код, если кому-то интересно. - person galaktor; 04.10.2009
comment
Мой код нуждается в разогреве (два статических конструктора и первый вызов динамически сгенерированного метода), чтобы работать хорошо, и это полностью универсальное решение, работающее с любым делегатом и любым количеством аргументов, как значений, так и ссылочные типы (без типов byref, потому что это действительно не имеет смысла). Я не могу придумать никакой реальной причины, по которой он работает медленнее, чем любое решение DynamicInvoke, но это также может зависеть от используемой структуры (Mono или MS CLR). Так что, если вы можете показать мне часть своего кода, мне было бы интересно исследовать причину низкой производительности. - person Lucero; 04.10.2009
comment
Хорошо, я провела несколько тестов самостоятельно. Даже прямой вызов BeginInvoke / EndInvoke делегатов в списке вызовов использует вдвое больше времени, чем мой код (не считая одного вызова разминки), и это без какой-либо дальнейшей оптимизации моего кода, а также без DynamicInvoke или дополнительных методов прокси. (ваш Func ‹,›). Я отправлю еще один ответ с этим кодом, чтобы вы могли сравнить его сами. - person Lucero; 04.10.2009
comment
Спасибо. Я запущу ваш тестовый код, когда у меня будет возможность. Мой тест не разогрел ваш код, поэтому я тоже попробую еще раз. Я вернусь к вам позже :-) - person galaktor; 05.10.2009
comment
Я опубликовал новую версию, которая должна быть значительно быстрее (особенно при сборке выпуска - сборка отладки выполняет больше проверок, которые не являются абсолютно необходимыми). - person Lucero; 06.10.2009

Если тип делегата известен, вы можете напрямую вызвать его BeginInvoke и сохранить IAsyncResults в массиве для ожидания и завершения вызовов. Обратите внимание, что вы должны вызвать EndInvoke, чтобы избежать потенциальных утечек ресурсов. Код основан на том факте, что EndInvoke ожидает завершения вызова, поэтому WaitAll не требуется (и, заметьте, У WaitAll есть несколько проблем, поэтому я бы избегал его использования).

Вот пример кода, который в то же время является упрощенным тестом для различных подходов:

public static class MainClass {
    private delegate void TestDelegate(string x);

    private static void A(string x) {}

    private static void Invoke(TestDelegate test, string s) {
        Delegate[] delegates = test.GetInvocationList();
        IAsyncResult[] results = new IAsyncResult[delegates.Length];
        for (int i = 0; i < delegates.Length; i++) {
            results[i] = ((TestDelegate)delegates[i]).BeginInvoke("string", null, null);
        }
        for (int i = 0; i < delegates.Length; i++) {
            ((TestDelegate)delegates[i]).EndInvoke(results[i]);
        }
    }

    public static void Main(string[] args) {
        Console.WriteLine("Warm-up call");
        TestDelegate test = A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A;
        test += A; // 10 times in the invocation list
        ParallelInvoke.Invoke(test, "string"); // warm-up
        Stopwatch sw = new Stopwatch();
        GC.Collect();
        GC.WaitForPendingFinalizers();
        Console.WriteLine("Profiling calls");
        sw.Start();
        for (int i = 0; i < 100000; i++) {
            // ParallelInvoke.Invoke(test, "string"); // profiling ParallelInvoke
            Invoke(test, "string"); // profiling native BeginInvoke/EndInvoke
        }
        sw.Stop();
        Console.WriteLine("Done in {0} ms", sw.ElapsedMilliseconds);
        Console.ReadKey(true);
    }
}

На моем очень старом ноутбуке это занимает 95553 мс с BeginInvoke / EndInvoke по сравнению с 9038 мс с моим подходом ParallelInvoke (MS .NET 3.5). Таким образом, этот подход не хорошо масштабируется по сравнению с решением ParallelInvoke.

person Lucero    schedule 04.10.2009
comment
Для объяснения проблемы производительности с BeginInvoke / EndInvoke см .: stackoverflow.com/questions/532791/ - person Lucero; 04.10.2009
comment
Спасибо, что указали на это - я бы в любом случае отклонил подход WaitAll из-за его ограничений! Как я уже сказал, я попробую ваш тестовый код, как только у меня будет возможность. - person galaktor; 05.10.2009
comment
Я запустил свой код с разогревом и более длительным тестовым запуском для лучшего сравнения. При 100000 запусков каждый ParallelInvoke занимал ~ 16 секунд (15948 мс), а Begin / EndInvoke выполнялся в течение ~ 19 секунд (19149 мс). Итак, ваше решение кажется намного быстрее, поэтому я отмечаю ваш код ParallelInvoke как правильный ответ. Еще раз спасибо. - person galaktor; 06.10.2009
comment
Пожалуйста. В какой среде выполнения вы запускали свои тесты, это был Mono? Потому что в моем случае Begin / EndInvoke работал намного хуже. Также обратите внимание, что я немного отредактировал ParallelInvoke и ввел поле invokeCallback, которое улучшает его работу в случае нескольких событий в списке вызовов. Поэтому, если вы ранжируете тест со старым кодом, возможно, внесите это изменение, чтобы получить еще лучшую производительность. - person Lucero; 06.10.2009
comment
Нет, я запускал его на MS.NET. Я заметил, что мои модульные тесты для этого кода, похоже, выполняются намного быстрее в Mono по сравнению с MS.NET. - person galaktor; 06.10.2009
comment
Сколько делегатов у вас было в списке вызовов? В моих тестах разница становилась намного больше, когда в список вызовов добавлялось больше элементов. - person Lucero; 06.10.2009
comment
Я добавил 100000 делегатов в список событий, а затем вызвал событие тремя разными способами: ParallelInvoke, Begin / EndInvoke и прямым вызовом события стандартным способом. - person galaktor; 06.10.2009
comment
Ах, интересно, но узким местом (или той частью, которая использует большую часть времени метки) в этом случае был пул потоков, а не сами вызовы. Я предлагаю вам также попробовать с 10-20 событиями в списке и выполнить кучу вызовов (100000 - это нормально), так как это не будет сильно нагружать пул потоков, а скорее сами фреймворки асинхронных вызовов. - person Lucero; 06.10.2009
comment
Теперь у меня было 100000 звонков на мероприятие, на котором присутствовало 20 делегатов. Результаты не сильно различаются ... ParallelInvoke: 33478 мс, Begin / EndInvoke: 43797 мс, Прямой вызов: 95 мс. Ваш код все еще явно быстрее, по крайней мере, на моем ноутбуке Core2Duo. - person galaktor; 06.10.2009
comment
Спасибо за информацию, так что большая разница на моем компьютере, возможно, связана с более старым процессором (размер кеша или что-то еще), я тестировал его с Pentium M 1,5 ГГц (из старых добрых времен еще в 2003 году). - person Lucero; 06.10.2009
comment
Я провел тот же тест в Mono для Windows 6 раз, вот интересные средние результаты ... ParallelInvoke: 66309,2 мс, прямой вызов: 162,167 мс, Begin / EndInvoke: 24741,7 мс. Очевидно, код Begin / EndInvoke реализован в Mono намного эффективнее, чем в MS.NET, однако ParallelInvoke дает гораздо худшие результаты. - person galaktor; 06.10.2009
comment
Поэтому я предполагаю, что Mono не полагается на архитектуру удаленного взаимодействия для Begin / EndInvoke. С другой стороны, возможно, DynamicMethods не так эффективны в Mono (или, может быть, просто инструкция Unbox_Any IL), я не вижу других мест, где можно было бы потерять так много времени. Тем не менее, я считаю, что мой код все еще демонстрирует, как создавать методы прокси вызова, которые принимают объект [] так же, как DynamicInvoke, но вызывают метод с использованием скомпилированного кода IL, что делает его более эффективным. ;) - person Lucero; 06.10.2009
comment
@galaktor, если вам нравится, вы можете отправить мне письмо для дальнейшего обсуждения. Почтовый адрес указан в источнике. - person Lucero; 06.10.2009

Кажется, вы дважды выполняете асинхронный запуск в своем фрагменте кода.

Сначала вы вызываете BeginInvoke для делегата - это ставит в очередь рабочий элемент, чтобы пул потоков выполнял делегата.

Затем внутри этого делегата вы используете QueueUserWorkItem, чтобы ... поставить в очередь другой рабочий элемент, чтобы пул потоков выполнял настоящего делегата.

Это означает, что когда вы возвращаете IAsyncResult (и, следовательно, дескриптор ожидания) от внешнего делегата, он будет сигнализировать о завершении, когда второй рабочий элемент будет поставлен в очередь, а не когда он завершит выполнение.

person Daniel Earwicker    schedule 04.10.2009
comment
Вы правы, у меня сначала не было BeginInvoke, он закрадывался при игре. Поправлю пример! - person galaktor; 04.10.2009

Вы делаете это для выступления?

Это будет работать только в том случае, если это позволит вам заставить несколько аппаратных средств работать параллельно, и это будет стоить вам накладных расходов на переключение процессов.

person Mike Dunlavey    schedule 27.10.2009
comment
Это был просто эксперимент, который меня заинтересовал. Тесты производительности показали, что оба представленных здесь параллельных подхода требуют на несколько сотен больше времени, чем стандартный последовательный подход. - person galaktor; 28.10.2009