Я разрабатываю сервис, который обрабатывает файлы, используя объект Queue<string>
для управления элементами.
public partial class BasicQueueService : ServiceBase
{
private readonly EventWaitHandle completeHandle =
new EventWaitHandle(false, EventResetMode.ManualReset, "ThreadCompleters");
public BasicQueueService()
{
QueueManager = new Queue<string>();
}
public bool Stopping { get; set; }
private Queue<string> QueueManager { get; }
protected override void OnStart(string[] args)
{
Stopping = false;
ProcessFiles();
}
protected override void OnStop()
{
Stopping = true;
}
private void ProcessFiles()
{
while (!Stopping)
{
var count = QueueManager.Count;
for (var i = 0; i < count; i++)
{
//Check the Stopping Variable again.
if (Stopping) break;
var fileName = QueueManager.Dequeue();
if (string.IsNullOrWhiteSpace(fileName) || !File.Exists(fileName))
continue;
Console.WriteLine($"Processing {fileName}");
Task.Run(() =>
{
DoWork(fileName);
})
.ContinueWith(ThreadComplete);
}
if (Stopping) continue;
Console.WriteLine("Waiting for thread to finish, or 1 minute.");
completeHandle.WaitOne(new TimeSpan(0, 0, 15));
completeHandle.Reset();
}
}
partial void DoWork(string fileName);
private void ThreadComplete(Task task)
{
completeHandle.Set();
}
public void AddToQueue(string file)
{
//Called by FileWatcher/Manual classes, not included for brevity.
lock (QueueManager)
{
if (QueueManager.Contains(file)) return;
QueueManager.Enqueue(file);
}
}
}
Исследуя, как ограничить количество потоков в этом (я пробовал ручной класс с увеличивающимся int
, но есть проблема, когда он не уменьшается должным образом в моем коде), я наткнулся на TPL DataFlow, что кажется более подходящим для того, чего я пытаюсь достичь, в частности, это позволяет мне позволить фреймворку обрабатывать потоки/очереди и т. д.
Теперь это мой сервис:
public partial class BasicDataFlowService : ServiceBase
{
private readonly ActionBlock<string> workerBlock;
public BasicDataFlowService()
{
workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 32
});
}
public bool Stopping { get; set; }
protected override void OnStart(string[] args)
{
Stopping = false;
}
protected override void OnStop()
{
Stopping = true;
}
partial void DoWork(string fileName);
private void AddToDataFlow(string file)
{
workerBlock.Post(file);
}
}
Это хорошо работает. Однако я хочу убедиться, что файл добавляется в TPL DataFlow
только один раз. С Queue
я могу проверить это с помощью .Contains()
. Есть ли механизм, который я могу использовать для TPL DataFlow
?