Обнаружение повторяющихся элементов в DataFlow

Я разрабатываю сервис, который обрабатывает файлы, используя объект Queue<string> для управления элементами.

public partial class BasicQueueService : ServiceBase
{
    private readonly EventWaitHandle completeHandle = 
      new EventWaitHandle(false, EventResetMode.ManualReset, "ThreadCompleters");

    public BasicQueueService()
    {
        QueueManager = new Queue<string>();
    }

    public bool Stopping { get; set; }

    private Queue<string> QueueManager { get; }

    protected override void OnStart(string[] args)
    {
        Stopping = false;

        ProcessFiles();
    }

    protected override void OnStop()
    {
        Stopping = true;
    }

    private void ProcessFiles()
    {
        while (!Stopping)
        {
            var count = QueueManager.Count;
            for (var i = 0; i < count; i++)
            {
                //Check the Stopping Variable again.
                if (Stopping) break;

                var fileName = QueueManager.Dequeue();
                if (string.IsNullOrWhiteSpace(fileName) || !File.Exists(fileName)) 
                       continue;

                Console.WriteLine($"Processing {fileName}");

                Task.Run(() =>
                    {
                        DoWork(fileName);
                    })
                    .ContinueWith(ThreadComplete);
            }
            if (Stopping) continue;

            Console.WriteLine("Waiting for thread to finish, or 1 minute.");
            completeHandle.WaitOne(new TimeSpan(0, 0, 15));
            completeHandle.Reset();
        }
    }

    partial void DoWork(string fileName);

    private void ThreadComplete(Task task)
    {
        completeHandle.Set();
    }

    public void AddToQueue(string file)
    {
        //Called by FileWatcher/Manual classes, not included for brevity.
        lock (QueueManager)
        {
            if (QueueManager.Contains(file)) return;

            QueueManager.Enqueue(file);
        }
    }
}

Исследуя, как ограничить количество потоков в этом (я пробовал ручной класс с увеличивающимся int, но есть проблема, когда он не уменьшается должным образом в моем коде), я наткнулся на TPL DataFlow, что кажется более подходящим для того, чего я пытаюсь достичь, в частности, это позволяет мне позволить фреймворку обрабатывать потоки/очереди и т. д.

Теперь это мой сервис:

public partial class BasicDataFlowService : ServiceBase
{
    private readonly ActionBlock<string> workerBlock;

    public BasicDataFlowService()
    {
        workerBlock = new ActionBlock<string>(file => DoWork(file), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 32
        });
    }

    public bool Stopping { get; set; }

    protected override void OnStart(string[] args)
    {
        Stopping = false;
    }

    protected override void OnStop()
    {
        Stopping = true;
    }

    partial void DoWork(string fileName);

    private void AddToDataFlow(string file)
    {
        workerBlock.Post(file);
    }
}

Это хорошо работает. Однако я хочу убедиться, что файл добавляется в TPL DataFlow только один раз. С Queue я могу проверить это с помощью .Contains(). Есть ли механизм, который я могу использовать для TPL DataFlow?


person Obsidian Phoenix    schedule 16.03.2017    source источник
comment
Все, что потребляет и отправляет файлы, несет ответственность за то, чтобы не публиковать их дважды. Если вы читаете файлы из каталога, вы можете пометить их или кэшировать путь, как предложил @VMAtm. Но если пользователи или иные клиенты отправляют их, вам нужно рассматривать процессы как работу. Где каждый файл представляет собой одно задание с одним результатом.   -  person JSteward    schedule 24.03.2017


Ответы (2)


Ваше решение с Queue работает только в том случае, если файл попадает в вашу службу дважды за небольшой промежуток времени. Если он снова пришел, скажем, через несколько часов, в очереди его уже не будет, так как вы Dequeue оттуда.

Если ожидается это решение, вы можете использовать MemoryCache для хранения уже обработанных путей к файлам, например:

using System.Runtime.Caching;

private static object _lock = new object();

private void AddToDataFlow(string file)
{
    lock (_lock)
    {
        if (MemoryCache.Default.Contains(file))
        {
            return;
        }

        // no matter what to put into the cache
        MemoryCache.Default[file] = true;
    // we can now exit the lock
    }

    workerBlock.Post(file);
}

Однако, если ваше приложение должно работать в течение длительного времени (для чего и предназначена служба), у вас в конечном итоге закончится память. В этом случае вам, вероятно, нужно сохранить пути к файлам в базе данных или что-то в этом роде, поэтому даже после перезапуска службы ваш код восстановит состояние.

person VMAtm    schedule 23.03.2017

Вы можете проверить это внутри DoWork.

Вы должны сохранить в Hash уже работающие элементы и проверить, что текущее имя файла не существует в хэше.

person Ygalbel    schedule 16.03.2017