BlockingCollection — проблема высокой синхронизации

Каков наилучший способ получать сообщения из многих потоков в очередь и иметь отдельные потоки, обрабатывающие элементы этой очереди по одному?

Я часто использую этот шаблон, когда пытаюсь отключить действия от многих потоков.

Я использую для этого BlockingCollection, как показано в фрагменте кода ниже:

// start this task in a static constructor
Task.Factory.StartNew(() => ProcessMultiUseQueueEntries(), TaskCreationOptions.LongRunning);


private static BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>> _q = new BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>>();

    /// <summary>
    /// queued - Simple mechanism that will log the fact that this user is sending an xMsg (FROM a user)
    /// </summary>
    public static void LogXMsgFromUser(XClientMsgExt xMsg)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(xMsg, null, "", BOStatus.Ignore));
    }

    /// <summary>
    /// queued - Simple mechanism that will log the data being executed by this user
    /// </summary>
    public static void LogBOToUser(BOInfo boInfo)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, boInfo, "", BOStatus.Ignore));
    }

    /// <summary>
    /// queued - Simple mechanism that will log the status of the BO being executed by this user (causes the red square to flash)
    /// </summary>
    public static void LogBOStatus(string UserID, BOStatus status)
    {
        _q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, null, UserID, status));
    }

    /// <summary>
    /// An endless thread that will keep checking the Queue for new entrants.
    /// NOTE - no error handling since this can't fail... :) lol etc 
    /// </summary>
    private static void ProcessMultiUseQueueEntries()
    {
        while (true)        //  eternal loop
        {
            Tuple<XClientMsgExt, BOInfo, string, BOStatus> tuple = _q.Take();

            // Do stuff

        }
    }

Это работает нормально, как я и думал, пока Мастер производительности в VS2010 не начал выделять строку _q.Take() как строку с наибольшим количеством конфликтов в моем коде!

Примечание. Я также использовал стандартную ConcurrentQueue с комбинацией ManualResetEvent, и каждый раз, когда я вставляю элемент в очередь, я сигнализирую о сбросе события, позволяя рабочему потоку исследовать и обрабатывать очередь, но это также имело тот же чистый эффект выделения в . Метод WaitOne()...

Существуют ли другие способы решения этой распространенной схемы наличия множества потоков, добавляющих объекты в параллельную очередь, и иметь один поток, прокладывающий себе путь через элементы по одному и в свое время...

Спасибо!!


person Marcel    schedule 18.03.2011    source источник
comment
Я согласен с существующими ответами, что вы должны ожидать, что это заблокируется при исключении из очереди, когда оно пусто. Или поставить в очередь при заполнении; если вы хотите попробовать другой импл: stackoverflow.com/questions/530211/   -  person Marc Gravell    schedule 18.03.2011


Ответы (3)


Высшая линия соперничества? Да, потому что это блокирующая коллекция! Этот вызов будет блокироваться (например, он может ожидать WaitHandle) до тех пор, пока в коллекцию не будет добавлен другой элемент.

Вы уверены, что это проблема? Это звучит именно так, как я и ожидал.

Если непонятно, что я имею в виду, рассмотрите этот код:

var blocker = new BlockingCollection<int>();
int nextItem = blocker.Take();

Как долго вы ожидаете, что вызов Take будет выполняться выше? Я ожидаю, что он будет ждать вечно, потому что в blocker ничего не добавляется. Таким образом, если бы я профилировал «производительность» приведенного выше кода, я бы увидел Take в самом верху списка долго выполняющихся методов. Но это не будет свидетельствовать о проблеме; еще раз: вы хотите этот вызов заблокировать.


В качестве совершенно отдельной заметки, могу ли я порекомендовать заменить Tuple<XClientMsgExt, BOInfo, string, BOStatus> типом, свойства которого имеют описательные имена? (Конечно, это предложение не имеет ничего общего с вашим вопросом; это всего лишь общий совет.)

person Dan Tao    schedule 18.03.2011
comment
@Jim: Ха, очевидно, ты оставил этот комментарий до обновления, которое я только что сделал;) - person Dan Tao; 18.03.2011
comment
Не поймите меня неправильно — код ведет себя ТОЧНО так, как я планировал. Тем не менее, я страдаю от того, что эта программа становится вялой, когда у меня больше пользователей и, следовательно, поступает гораздо больше сигналов. Возможно, стоит отметить, что (один) поток, который собирает элементы очереди, обновляет элементы UserInfo, привязанные к WPF. к пользовательскому интерфейсу. - person Marcel; 18.03.2011
comment
Извините, снова нажмите Enter... И с увеличением количества пользователей пользовательский интерфейс становится вялым. Я пытаюсь понять, почему так получилось, что анализатор производительности сталкивается с разногласиями в этой строке .Take(), которую я изо всех сил пытаюсь понять. - person Marcel; 18.03.2011
comment
@Marcel: Я имею в виду, что если коллекция пуста (или заполнена), вызов Take должен блокироваться, что, вероятно, и происходит, и, вероятно, поэтому ваш анализатор производительности сообщает о конфликте. Это настольное приложение? И если да, то вызываете ли вы Take из потока пользовательского интерфейса? Вы определенно не должны этого делать; Я могу сказать вам, что много. В противном случае вам придется включить некоторые дополнительные сведения о вашем приложении, чтобы люди могли вам помочь. - person Dan Tao; 18.03.2011
comment
Я сталкиваюсь с аналогичной проблемой. блокировка здесь не проблема, так как у меня есть много производителей, загружающих элементы через HTTPClient и ставящих их в очередь в блокирующей коллекции, сохраняя ее более или менее полной, это пропускная способность на стороне потребителя для удаления элементов из очереди, которая кажется потенциальным узким местом. Я провел простой тест на постановку в очередь 5 млн элементов на нескольких производителях, и у меня есть один потребитель (тот, который имеет сходство с потоком) для удаления элементов, и производительность не идеальна. Я рассчитал только загрузку элементов (со стороны производителя), и она работает молниеносно. Есть ли другие высокопроизводительные варианты? - person Abhijeet Patel; 04.09.2016

То, что _q.Take() является высшей линией разногласий, само по себе бессмысленно. Будет конкуренция, если многие потоки ожидают элементов. Большой вопрос заключается в том, стоит ли вам это соперничество с точки зрения производительности. Несколько вопросов, на которые нужно найти ответы:

  1. Способны ли вы обрабатывать элементы достаточно быстро, чтобы предотвратить неограниченный рост очереди?
  2. Стоит ли вам спор с точки зрения использования ЦП?
  3. Если вы перестанете добавлять элементы в коллекцию, мастер производительности по-прежнему будет сообщать о конфликтах?
  4. Если вы перестанете добавлять вещи в коллекцию, будет ли высокая загрузка ЦП?

Если вы можете предотвратить рост очереди и ресурсы ЦП не тратятся на получение предметов, то проблем нет. За исключением, возможно, того, что у вас больше потоков, чем необходимо для чтения из коллекции.

person Jim Mischel    schedule 18.03.2011
comment
Что касается вашего третьего элемента списка, я ожидаю, что количество зарегистрированных конфликтов будет максимальным, когда элементы не добавляются в коллекцию. Он будет бесконечно ждать какой-либо формы мьютекса; Я не уверен, что профилировщик мог бы отличить это от плохой конкуренции. - person Dan Tao; 18.03.2011
comment
@Dan: В этом и был смысл. Если он сообщает о конфликте, когда элементов нет, то конфликт, вероятно, не является проблемой. - person Jim Mischel; 18.03.2011
comment
У меня есть только один поток, который обрабатывает элементы вне очереди, но многие потоки помещают в него элементы. Есть ли какой-либо стандартный подход для этого шаблона кодирования? - person Marcel; 18.03.2011
comment
Хорошо, теперь я понимаю, что вы имели в виду. Я интерпретировал смысл вашего вопроса задом наперёд. @Marcel: Подход, который вы используете, является правильным. Я почти гарантирую, что Take не ваш виновник. В зависимости от того, как вы его используете, может возникнуть проблема — например, если вы вызываете его из того же потока, который отвечает за обновление пользовательского интерфейса. - person Dan Tao; 18.03.2011

Рассматривали ли вы возможность использования реального MSMQ ? Может показаться излишним, но он обеспечивает то, что вам нужно. Нельзя сказать, что ваше приложение не может быть одновременно и писателем, и иметь выделенный поток для чтения и обработки сообщений.

person TheZenker    schedule 18.03.2011
comment
Знаете, почему это кажется излишеством? Потому что это: Обработка (насколько нам известно) проблем с синхронизацией в процессе путем вызова из процесса. Сам по себе этот факт, не говоря уже об использовании MSMQ, добавляет множество источников ошибок, не говоря уже о задержке. - person Evgeniy Berezovsky; 09.06.2015