BlockingCollection слишком долго просыпается

У меня проблема, когда BlockingCollection<MyItem> слишком долго просыпается от .Take() звонка.

Сценарий таков: у меня есть поток, который очень быстро отправляет данные в BlockingCollection (на самом деле в XUnit я выполнил цикл for. У меня есть 3 Task, которые просто сидят в .Take() вызове и ждут добавления элементов. Из вывода я могу видеть что почти 200 элементов (до 1 секунды или более) были добавлены в коллекцию перед первым Task пробуждением и фактическим сбором данных из BlockingCollection.

У меня есть несколько "буферов" на основе BlockingCollection, организованных конвейерным способом, и все они страдают от слишком длительного пробуждения при выполнении .Take() действия.

Я пробовал .TryTake() и .GetConsumingEnumerable() с теми же результатами.

Идея о том, что в конце этого конвейера у меня есть однопоточная медленная функция, которая обрабатывает элементы один за другим, и для обработки одного элемента может потребоваться неизвестное время. Мне просто нужно убедиться, что "элемент" перешел из "буфера" в "буфер" очень быстро (как только он будет вставлен в первый "буфер")

Мне просто нужно убедиться, что время запуска (в .Take() или .TryTake() и т. Д.) Будет близко к тому времени, когда элемент добавлен в коллекцию


person Jasper    schedule 25.05.2015    source источник
comment
Опубликовать полный исполняемый код воспроизведения. Я подозреваю, что вы не можете и найдете ошибку в процессе.   -  person usr    schedule 25.05.2015
comment
@usr eee .... Где искать баг? в .Take() реализации? Я не сказал, что приложение вылетает или что-то в этом роде, и я не сказал, что это not working '- оно действительно работает, и все прошло, обработано и т.д ... ок. Я прошу шаблон проектирования для быстрого производителя - медленного потребителя   -  person Jasper    schedule 25.05.2015
comment
Сколько активных потоков у вас (пытаетесь) иметь?   -  person Henk Holterman    schedule 25.05.2015
comment
@ Джаспер, я хочу сказать, что вы не делаете очевидной ошибки. Чтобы найти ошибку, необходимо посмотреть код.   -  person usr    schedule 25.05.2015
comment
@HenkHolterman Хммм .. хоть вопрос. В настоящее время я вижу около 65 потоков в vs.executionengine.x86 (приложение для запуска тестов xunit), так что я думаю, что почти все они мои, и я думаю, мне нужна добрая половина из них вживую. Не знаю, как ответить ....   -  person Jasper    schedule 25.05.2015
comment
Ваше требование странное ... Почему вы вообще заботитесь о от буфера к буферу очень быстро? Эта часть не имеет значения. А при слишком большом количестве потоков на слишком малом количестве ядер им просто придется ждать своей очереди.   -  person Henk Holterman    schedule 25.05.2015
comment
@HenkHolterman Извините, это вводило в заблуждение - ›Я исправляю вопрос   -  person Jasper    schedule 25.05.2015
comment
Что вы имеете в виду под start-up временем?   -  person VMAtm    schedule 25.05.2015
comment
65 потоков?!?! Если у вас гораздо больше живых потоков, чем, скажем, (количество ядер) * (2 ~ 4), то у вас серьезная проблема с дизайном!   -  person AK_    schedule 25.05.2015
comment
@VMAtm - Как мне грустно в вопросе. BlockingCollection требуется слишком много времени, чтобы проснуться и действительно понять, что у него есть элементы в коллекции и на самом деле .Take() они. Я собираю до 200 предметов в коллекцию непосредственно перед самой первой .Take() разблокировкой и сбором   -  person Jasper    schedule 26.05.2015
comment
@AK_ Я не знаю, сколько потоков у меня и каков их статус :( Это всего лишь окончательное число, включающее все отовсюду. Я не знаю, как это понять :(   -  person Jasper    schedule 26.05.2015
comment
Что ж, я предполагаю, что ваши проблемы связаны с конфликтом потоков и переключением контекста. stackoverflow.com/questions/1970345/what-is-thread-contention < / а>   -  person AK_    schedule 26.05.2015
comment
@AK_ Думаю, ты прав! Пожалуйста, опубликуйте его как ответ на вопрос, чтобы я мог «отметить его как ответ»   -  person Jasper    schedule 26.05.2015
comment
Возможно, многопоточность - неправильное решение в этой ситуации. Что-нибудь асинхронное, основанное на Task ‹T›, могло бы быть лучшим подходом. Или, если у вас есть последовательность данных, на которую вы хотите немедленно отреагировать, почему бы не рассмотреть возможность использования Reactive Extensions для .NET (Rx)?   -  person Tim Long    schedule 26.05.2015
comment
@TimLong Вы правы. Мое решение все больше и больше имитирует Rx. Я даже заимствую от IObservable<> и IObserver<>, потому что это именно тот поток данных, который мне нужен, но я все еще пытаюсь понять Rx, и на самом деле здесь я столкнулся с проблемой создания потоков ... Может быть, у вас есть ссылка на проект GitHub что с использованием Rx, чтобы я мог узнать об этом немного больше?   -  person Jasper    schedule 27.05.2015