параллельная работа с ограничением количества потоков в бесконечном цикле

Я пишу бесконечный цикл для извлечения из очереди (RabbitMQ) и обработки каждого извлеченного элемента в параллельных потоках с ограниченным количеством запущенных потоков. Теперь мне нужно решение для ограничения количества выполненных потоков. См. пример моего цикла:

public class ThreadWorker<T>
{
    public List<T> _lst;
    private int _threadCount;
    private int _maxThreadCount;
    public ThreadWorker(List<T> lst, int maxThreadCount)
    {
        _lst = lst;
        _maxThreadCount = maxThreadCount;
    }

    public void Start()
    {
        var i = 0;
        while (i < _lst.Count)
        {
            i++;
            var pull = _lst[i];

            Process(pull);
        }
    }

    public void Process(T item)
    {
        if (_threadCount > _maxThreadCount)
        {
            //wait any opration be done 
            // How to wait for one thread?

            Interlocked.Decrement(ref _threadCount);
        }

        var t = new Thread(() => Opration(item));

        t.Start();

        Interlocked.Increment(ref _threadCount);
    }

    public void Opration(T item)
    {
        Console.WriteLine(item.ToString());
    }
}

Обратите внимание, что когда я использую семафор для ограничения, метод Start() не ждет всех запущенных потоков. мой цикл должен после запуска потоков с _maxThreadCount дождаться освобождения потока, а затем нажать новый поток для параллельной обработки.


person Arash Karami    schedule 04.08.2015    source источник
comment
Моя проблема связана с бесконечным параллелизмом. Для ParallelOption.maxdegreeofparallelism, но я не хочу его использовать. Я хочу управлять потоками в своем коде.   -  person Arash Karami    schedule 04.08.2015


Ответы (1)


Я бы использовал Semaphore таким образом, чтобы управлять количеством потоков:

public class ThreadWorker<T>
{
    SemaphoreSlim _sem = null;
    List<T> _lst;

    public ThreadWorker(List<T> lst, int maxThreadCount)
    {
        _lst = lst;
        _sem = new SemaphoreSlim(maxThreadCount);
    }

    public void Start()
    {
        var i = 0;
        while (i < _lst.Count)
        {
            i++;
            var pull = _lst[i];
            _sem.Wait(); /*****/
            Process(pull);
        }
    }

    public void Process(T item)
    {
        var t = new Thread(() => Opration(item));
        t.Start();
    }

    public void Opration(T item)
    {
        Console.WriteLine(item.ToString());
        _sem.Release(); /*****/
    }
}
person Eser    schedule 04.08.2015
comment
Спасибо, но когда мы используем semaphore или semaphoreSlim, метод Start() не ждет завершения всех запущенных потоков. У вас есть идеи по этой проблеме? - person Arash Karami; 04.08.2015
comment
@Араш, тогда добавь for (int i = 0; i < maxThreadCount; i++) _sem.Wait(); в конец Start - person Eser; 04.08.2015