Почему и как цикл foreach в BlockingCollection работает бесконечно?

Я работаю над простым регистратором файлов, у меня были проблемы с потокобезопасностью, и я смотрел, как это делают другие. Я столкнулся с подходом к использованию BlockingCollection в качестве очереди и цикла foreach для обработки этой очереди:

var queue = new BlockingCollection<string>(1024);
var t = Task.Factory.StartNew(() => {
                                        foreach (var message in queue.GetConsumingEnumerable()) {
                                            WriteMessageToFile(message);
                                        }
                                    }, TaskCreationOptions.LongRunning);

Dim queue = New BlockingCollection(Of String)(1024)
Dim t = Task.Factory.StartNew(Sub()
                                  For Each message In queue.GetConsumingEnumerable()
                                      WriteMessageToFile(message)
                                  Next
                              End Sub, TaskCreationOptions.LongRunning)

Этот цикл For Each на самом деле работает бесконечно. Он обрабатывает данные, которые я добавляю в queue, пока я не вызову .CompleteAdding в BlockingCollection.

Интересно, почему это так работает и как, и если это хороший подход. Что делает поток, пока коллекция пуста, проверяет ли он каждый тик? Разве это не ресурсоемко?


person Fox    schedule 10.06.2020    source источник
comment
Если производитель явно не указывает, что данных больше не будет, как вы ожидаете, что потребляющий перечисляемый объект сообщит о разнице между тем, что коллекция в настоящее время пуста, но может измениться (и, следовательно, продолжить перечисление), и коллекция в настоящее время пуста и что никогда не изменится (и, следовательно, откажется от перечисления)? Это (по крайней мере) два отдельных потока/задачи, поэтому одному нужно сообщить об этом другому каким-то образом.   -  person Lance U. Matthews    schedule 10.06.2020
comment
Да, это хороший подход. Нет, он не проверяет каждый тик; он блокирует поток, но операционная система не будет планировать выполнение этого потока, когда коллекция пуста.   -  person John Wu    schedule 10.06.2020
comment
@БЕКОН Что? Разве это не мой вопрос? Откуда он знает? foreach обычно ничего не делает с пустой коллекцией, почему этого не происходит здесь?   -  person Fox    schedule 10.06.2020
comment
@JohnWu Вау, это здорово. Как ОС узнает, что коллекция пуста? Эта логика реализована в BlockingCollection или она просто так работает?   -  person Fox    schedule 10.06.2020
comment
Он знает, как вы говорите, звоните .CompleteAdding, так что нет, ваш вопрос, похоже, звучит так: «Интересно, почему это так работает». Я говорю, что если бы не было требования вызывать CompleteAdding(), как еще он определил бы, что данных действительно больше нет? Что, если вы запустите задачи производителя и потребителя одновременно, и потребители попадут в коллекцию раньше, чем она будет содержать данные, или данные будут создаваться медленнее, чем потребляться? Если бы они навсегда отказались от перечисления коллекции, потому что в этот момент она оказалась пустой, это было бы не очень полезно, не так ли?   -  person Lance U. Matthews    schedule 10.06.2020
comment
@Fox это логика, реализованная в коллекции блокировки с точки зрения примитивов потоковой передачи Windows. Я бы предположил (не проверяя источник), что он говорит Windows ждать события ядра (не то же самое, что событие .NET), которое запускается подпрограммой добавления.   -  person Craig    schedule 10.06.2020
comment
@BACON Хорошо, но почему он вообще ждет? Обычно, когда вы используете foreach для чего-то пустого, он просто ничего не делает. Изменяет ли это использование ConsumingEnumerator?   -  person Fox    schedule 10.06.2020
comment
@Craig Тогда это работает с Linux? Использование .net-core здесь   -  person Fox    schedule 10.06.2020
comment
Опять же, подумайте о данных, которые потребляются быстрее, чем производятся. По мере того, как потребители догоняют производителя, коллекция может стать пустой. Тот факт, что больше нет данных для обработки сейчас, не означает, что их не будет в будущем, поэтому счетчик(и) необходимо поддерживать в рабочем состоянии, если и когда это произойдет. GetConsumingEnumerator() включает использование foreach, помимо прочего, завершая перечисление только после того, как BlockingCollection<>.IsCompleted становится true (это означает, что Count равно 0 и CompleteAdding() вызывается). Помните, что он поддерживает несколько производителей или потребителей.   -  person Lance U. Matthews    schedule 10.06.2020
comment
@Fox для Linux, я не уверен, какие примитивы он будет использовать, но я ожидаю, что он будет использовать приблизительный эквивалент примитивов Windows.   -  person Craig    schedule 10.06.2020
comment
Что касается того, что происходит с For Each, за кулисами происходит вызов метода MoveNext перечислителя (для продвижения вперед и проверки достижения конца) и чтение свойства Current. В случае ConsumingIterator я ожидаю, что MoveNext просто блокируется до тех пор, пока не будет что-то (или ничего) для возврата.   -  person Craig    schedule 10.06.2020
comment
@ Крейг Я даже не знал, что foreach так работает, теперь вижу!   -  person Fox    schedule 11.06.2020


Ответы (1)


Он использует SemiphoreSlim для ожидания GetConsumingEnumerable, а Release вызывается при добавлении элементов (очень упрощенно).

SemaphoreSlim — это облегченная альтернатива классу Semaphore, которая не использует семафоры ядра Windows.

https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim?view=netcore-3.1

Вы можете прочитать весь код для BlockingCollection здесь: https://github.com/dotnet/runtime/blob/4f9ae42d861fcb4be2fcd5d3d55d5f227d30e723/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/BlockingCollection.cs

person Michael    schedule 10.06.2020
comment
Спасибо за источник! Вместе с информацией от @Craig о том, как на самом деле работает foreach, я понял это сейчас. - person Fox; 11.06.2020