BlockingCollection Take() блокирует навсегда

У меня есть простая установка производителя-потребителя, использующая блокирующую коллекцию. Потребитель сидит в цикле в течение всего времени работы нашего приложения, ожидая, пока потребитель поместит элементы в коллекцию, затем извлекает элемент и записывает его в последовательный порт. По какой-то причине collection.Take() навсегда блокируется, когда в коллекции есть элементы. Для этого приложения у нас может быть активен один или несколько из этих ProducerConsumers одновременно. Они ведут себя одинаково независимо.

public class ProducerConsumer 
{
    private Task _backgroundWorker;
    private CancellationTokenSource _cancellationTokenSource;
    private BlockingCollection<Data> _dataQueue;

    public ProducerConsumer() 
    {
        _dataQueue = new BlockingCollection<Data>();
        _cancellationTokenSource = new CancellationTokenSource();
        _backgroundWorker = new Task(() => DoWork(_cancellationTokenSource.Token), TaskCreationOptions.LongRunning);
        _backgroundWorker.Start();
    }

    public void AddData(Data data) 
    {
        _dataQueue.Add(data);
        System.Diagnostics.Debug.WriteLine(_dataQueue.Count);
    }

    private void DoWork(CancellationToken cancellationToken)
    {
        while(!cancellationToken.IsCancellationRequested)
        {
            try
            {
                _dataQueue.Take(cancellationToken); //This is blocking forever

                //DoWork
            }
            catch(OperationCanceledException) { }
            catch(Exception e)
            {
                System.Diagnostics.Debug.WriteLine(e.ToString());
                throw;
            }
        }
    }  
}

При выполнении этого оператора печати увеличивается, поэтому у нас определенно есть данные в коллекции, но по какой-то причине Take() продолжает блокироваться.

Он также не генерирует исключение.

Отмена запрашивается с помощью Dispose(), но я этого не добавлял. Это не называется быть вызванным рано.

Я пытался использовать .GetConsumingEnumerable(), и это также блокирует навсегда.

Я неправильно запускаю задачу? Может у меня закончились темы?

Я рассматривал возможность использования BackgroundWorker вместо Task, но согласно MSFT Предпочтение отдается задаче.

Заранее спасибо.


person mAniC    schedule 02.11.2018    source источник
comment
Задачи создаются и запускаются с помощью Task.Run. Если вам нужна длительная задача, используйте TaskFactory.StartNew. Нет нет причин использовать холодные задачи, как это делаете вы. Есть много других странных вещей, например, DoWork возвращает Task, хотя это синхронный метод. Наконец, вы ничего не показываете, вызывая AddData()   -  person Panagiotis Kanavos    schedule 02.11.2018
comment
Спасибо что нашли время ответить. AddData вызывается во многих местах, и было бы излишним добавлять строку, показывающую, что он вызывается. Что вы подразумеваете под холодными задачами? Do work изначально было недействительным, я изменил его, чтобы посмотреть, имеет ли это значение. Я обновлю вопрос, чтобы изменить его обратно на void   -  person mAniC    schedule 02.11.2018
comment
Холодная задача — это задача, созданная с помощью new, для запуска которой требуется Start(). Это никогда не используется   -  person Panagiotis Kanavos    schedule 02.11.2018
comment
Вы проверили примеры документации, например Как добавлять и брать элементы из BlockingCollection по отдельности? Код там намного проще, чем то, что вы разместили. Я бы ни в коем случае не использовал BlockingCollection, потому что он требует блокировки   -  person Panagiotis Kanavos    schedule 02.11.2018
comment
Take и GetConsumingEnumerable будут ждать, пока либо не будет добавлен новый элемент, либо вы не уведомите коллекцию о завершении добавления чего-либо, вызвав CompleteAdding.   -  person ckuri    schedule 02.11.2018
comment
@ckuri Новые элементы добавляются, но по какой-то причине работа не продолжается.   -  person mAniC    schedule 02.11.2018
comment
@PanagiotisKanavos Я изменил task.Start на Task.Factory.StartNew(() => DoWork(), TaskCreationOptions.LongRunning); но я все еще получаю такое же поведение. Пробуем ActionBlock сейчас   -  person mAniC    schedule 02.11.2018


Ответы (1)


Прежде всего, я бы не стал создавать свою собственную реализацию производителя/потребителя, особенно такую, которая блокирует. С простым сценарием производителя/потребителя можно легко справиться с помощью ActionBlock. ActionBlock имеет внутреннюю очередь, в которую могут отправлять сообщения несколько одновременно работающих производителей. ActionbBlock будет обрабатывать сообщения в очереди в фоновом режиме, используя рабочий метод, переданный его конструктору:

class SerialWorker
{
    ActionBlock<Data>  _serialBlock;

    public SerialWorker()
    {    
        _serialBlock=new ActionBlock<Data>(data=>DoWork(data));
    }

    //The worker action can be synchronous 
    private void DoWork(Data data)
    {
    }
    //or asynchronous
    private async Task DoWorkAsync(Data data)
    {
    }


    //Producer Code
    //While the application runs :
    public void PostData(Data data)
    {
        _serialBlock.Post(someData);
    }

//When the application finishes 
//Tell the block to shut down and wait for it to process any leftover requests
    public async Task Shutdown()
    {
        _serialBlock.Complete();    
        await _serialBlock.Completion;
    }

Рабочий метод может быть асинхронным, например, new ActionBlock<Data>(data=>DoWorkAsync(data)) будет работать нормально. Это позволяет использовать асинхронные методы без блокировки внутри самого воркера.

Новые сообщения отправляются с ActionBlock.Post. Когда пришло время завершить работу, приложение должно вызвать Complete(), чтобы уведомить блок действий и дождаться его завершения. ActionBlock перестанет получать больше сообщений и обработает все, что еще осталось в его буфере, прежде чем завершить работу.

person Panagiotis Kanavos    schedule 02.11.2018
comment
Ух ты. Я не знал, что такое существует. Это выглядит намного лучше, чем то, что мы делаем сейчас. Я посмотрю и дам вам знать. Спасибо за совет. - person mAniC; 02.11.2018
comment
@mAniC также проверьте остальные документы. Вы можете создавать конвейеры связанных блоков, устанавливать ограничения на то, сколько вещей может быть буферизовано за раз и т. д. - person Panagiotis Kanavos; 02.11.2018