длительная блокировка процесса

Я создаю проект webservice+servicebus, где пользователь может сделать что-то вроде

public void ExecuteLongProcess(DateTime fromDate,string aggregateId){}

Этот метод сразу возвращает, но отправляет по шине запрос на выполнение операции.

Мои проблемы начинаются, когда несколько пользователей запрашивают длительный процесс с одним и тем же агрегатным идентификатором, когда другой уже запущен.

Решение, о котором я думаю, - это задача, которая выполняется непрерывно и ищет в Queue<LongProcessTask> операцию, которая должна быть выполнена, поэтому я запускаю только один процесс за раз, иначе будущая реализация будет состоять из нескольких процессов, если другой агрегатный идентификатор.

Таким образом, я не перекрываю длительный процесс с одним и тем же агрегатом.

Другие идеи?


person Mauro Destro    schedule 31.10.2011    source источник
comment
Конечно, довольно стандартный сценарий производитель/потребитель. Поточно-безопасная очередь и один поток выполняют свою работу.   -  person Hans Passant    schedule 31.10.2011


Ответы (2)


Я создал TaskRunner, который создает экземпляр некоторой непрерывно выполняющейся задачи (количество зависит от ядер процессора), которая просматривает параллельную очередь и запускает каждую ожидающую операцию. TaskRunner получает от Windsor обработчик для каждого типа операции, чтобы оставить в стороне обработку каждой операции в классе.

person Mauro Destro    schedule 03.11.2011

В своем ответе вы говорите, что несколько потоков будут брать задачи из параллельной очереди. В этом случае есть вероятность, что две задачи с одним и тем же агрегатным идентификатором могут выполняться одновременно. Я не знаю, является ли это проблемой для вас, если да, то вы должны использовать разные очереди для каждого агрегатного идентификатора.

Если порядок задач не является проблемой, я рекомендую использовать BlockingCollection. Потому что возникает вопрос: что вы планируете делать с несколькими потребительскими потоками, если в параллельной очереди нет задачи.

while(some_condition_to_keep_thread_alive)
{
  if(!queue.TryDequeue(...))
    continue;
  else 
  {
    //do the job
  }
}

Этот код сведет ваши ядра с ума, если queue будет пустым. Вам нужен блокирующий механизм. BlockingCollection сделает это за вас.

Вы настаиваете на использовании ConcurrentQueue? Хорошо, SemaphoreSlim — ваш друг.

person ali_bahoo    schedule 03.11.2011
comment
Да, моя очередь представляет собой ConcurrentDictionary‹OperationTask,OperationRequest›; внутри каждого потока я получаю только операцию агрегатного идентификатора, которая не выполняется где-либо еще, а также упорядочена по отметке времени в очереди. Единственное, чего мне не хватает, так это семафора. Я думаю, что могу сигнализировать потокам, когда новая операция ставится в очередь - person Mauro Destro; 04.11.2011
comment
Помните, что ConcurrentDictionary неупорядочен. Если порядок важен, вы можете использовать ConcurrentQueue<KeyValuePair<OperationTask,OperationRequest>>. Конечно, трудно что-то сказать, не зная деталей вашей реализации. - person ali_bahoo; 04.11.2011
comment
Да, я знаю об этом. У меня есть свойство QueuedIn для сортировки по времени вставки в очередь - person Mauro Destro; 04.11.2011