Потоки в С# для создания распределенной DFS

Я пытался реализовать распределенный поиск в глубину в С#. Я был успешным до определенного момента, но получил ошибку синхронизации. Я не могу исправить ошибку. То, что я пытаюсь сделать, это заставить каждый узел общаться друг с другом, используя параллельный поток данных задачи, и тем самым я достигаю параллелизма в DFS. Ниже мой код:

public class DFS
{
static List<string> traversedList = new List<string>();

static List<string> parentList = new List<string>();
static Thread[] thread_array;
static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

public static void Main(string[] args)
{

    int N = 100;
    int M = N * 4;
    int P = N * 16;

    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();

    List<string> global_list = new List<string>();

    StreamReader file = new StreamReader(args[args.Length - 2]);


    string text = file.ReadToEnd();

    string[] lines = text.Split('\n');



    string[][] array1 = new string[lines.Length][];

    for (int i = 0; i < lines.Length; i++)
    {
        lines[i] = lines[i].Trim();
        string[] words = lines[i].Split(' ');

        array1[i] = new string[words.Length];

        for (int j = 0; j < words.Length; j++)
        {
            array1[i][j] = words[j];
        }
    }

    StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

    for (int i = 0; i < array1.Length; i++)
    {
        for (int j = 0; j < array1[i].Length; j++)
        {
            if (j != 0)
            {
                sr.Write(array1[i][0] + ":" + array1[i][j]);
                Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                sr.Write(sr.NewLine);
            }
        }

    }
    int start_no = Convert.ToInt32(args[args.Length - 1]);
    thread_array = new Thread[lines.Length];
    string first_message = "root";
    buffer1.Post(first_message);
    buffer1.Post(array1);
    buffer1.Post(start_no);
    buffer1.Post(1);

    for (int t = 1; t < lines.Length; t++)
    {
        Console.WriteLine("thread" + t);
        thread_array[t] = new Thread(new ThreadStart(thread_run));
        thread_array[t].Name = t.ToString();
        lock (thread_array[t])
        {
            Console.WriteLine("working");
            thread_array[t].Start();
            thread_array[t].Join();
        }

    }
    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed);
    Console.ReadLine();
}

private static void dfs(string[][] array, int point)
{
    for (int z = 1; z < array[point].Length; z++)
    {
        if ((!traversedList.Contains(array[point][z])))
        {
            traversedList.Add(array[point][z]);
            parentList.Add(point.ToString());
            dfs(array, int.Parse(array[point][z]));
        }

    }
    return;


}
public static void thread_run()
{
    try
    {
        string parent;
        string[][] array1;
        int point;
        int id;
        parent = (string)buffer1.Receive();
        array1 = (string[][])buffer1.Receive();
        point = (int)buffer1.Receive();
        id = (int)buffer1.Receive();
        object value;
        Console.WriteLine("times");

        if (Thread.CurrentThread.Name.Equals(point.ToString()))
        {
            if (!traversedList.Contains(point.ToString()))
            {
                Console.WriteLine("Node:" + point + " Parent:" + parent + " Id:" + id);
                traversedList.Add(point.ToString());
                parent = point.ToString();
                for (int x = 1; x < array1[point].Length; x++)
                {
                    Console.WriteLine("times");
                    if (buffer1.TryReceive(out value))
                    {
                        array1 = (string[][])value;
                    }
                    if (buffer1.TryReceive(out value))
                    {
                        id = (int)buffer1.Receive();
                    }
                    id++;
                    buffer1.Post(parent);
                    buffer1.Post(array1);
                    buffer1.Post(x);
                    buffer1.Post(id);
                    Console.WriteLine("times");
                    Monitor.PulseAll(Thread.CurrentThread);
                }

                //return;
            }
            else
            {
                buffer1.Post(parent);
                buffer1.Post(array1);
                buffer1.Post(point);
                buffer1.Post(id);
                Console.WriteLine("working 1");
                Monitor.PulseAll(Thread.CurrentThread);
            }
        }
        else
        {
            Console.WriteLine("working 2");
            Monitor.Wait(Thread.CurrentThread);
        }
        //Console.WriteLine(parent);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }

}

}

введите здесь описание изображения


person Rigorous implementation    schedule 05.06.2012    source источник
comment
чем это отличается от вопроса, который вы задали 3 дня назад @ stackoverflow.com/questions/10852317/ ?   -  person James Manning    schedule 05.06.2012
comment
@JamesManning: Пожалуйста, не делайте этого, я выполнил последовательную реализацию в последнем вопросе, но нашел способ распределенной реализации (используемые потоки), и здесь я застрял с этой ошибкой. В Java это проще, потому что мы в основном используем ключевое слово synchronized, но в C # я не нахожу ничего простого.   -  person Rigorous implementation    schedule 05.06.2012
comment
Из какой строки кода исходит эта ошибка?   -  person Faraday    schedule 05.06.2012
comment
@Vijay: на Monitor.Wait(Thread.currentThread).. когда я пытаюсь запустить поток внутри основного ожидания...   -  person Rigorous implementation    schedule 05.06.2012
comment
Ваш код НЕ использует поток данных. Весь смысл потока данных заключается в использовании ActionBlocks вместо потоков для упрощения обработки. Вместо этого вы используете BufferBlock аналогично ConcurrentQueue или любой другой параллельной коллекции.   -  person Panagiotis Kanavos    schedule 05.06.2012


Ответы (3)


Там различные проблемы с вашим кодом.

Неправильное использование блокировки и «прикосновения» к traversedList из нескольких потоков — наиболее очевидная проблема.

Что еще более важно, ваш код на самом деле не использует поток данных, он использует BufferBlock аналогично ConcurrentQueue или любой другой параллельной коллекции. Весь смысл потока данных заключается в использовании ActionBlocks. вместо потоков для упрощения обработки. По умолчанию блок действий будет использовать для обработки только один поток, но вы можете указать столько потоков, сколько хотите, через DataflowBlockOptions.

Блоки действий имеют свои собственные входные и выходные буферы, поэтому вам не нужно добавлять дополнительные блоки буферов только для буферизации.

Передача нескольких связанных значений в блок — еще одна проблема, так как это может привести к ошибкам и сделать код запутанным. Создание структуры данных для хранения всех значений ничего не стоит.

Предполагая, что вы используете этот класс для хранения сообщения обработки:

    public class PointMessage
    {
        public string Message { get; set; }
        public string[][] Lines{get;set;}
        public int Point { get; set; }
        public int ID { get; set; }
    }

Вы можете создать ActionBlock для обработки этих сообщений следующим образом:

static ActionBlock<PointMessage> _block;
...
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = ExecutionDataflowBlockOptions.Unbounded };
_block=new ActionBlock<PointMessage>(msg=>ProcessMessage(msg),options);

И обработайте каждое сообщение следующим образом:

    private static void ProcessMessage(PointMessage arg)
    {
        if (...)
        {
            ...
            arg.ID++;
            _block.Post(arg);
        }
        else
        {
             ...
            _block.Post(arg);
        }
    }

Если ваша функция возвращает значение, вы можете использовать TransformBlock вместо ActionBlock.

Я не понимаю, что делает ваш код, поэтому не буду пытаться переписывать его с помощью DataFlow. Если немного почистить, помочь будет легче.

person Panagiotis Kanavos    schedule 05.06.2012
comment
Я просто хочу отправить уведомление и получить подтверждения от каждого из узлов, присутствующих в графе. Вот почему я использую параллельную библиотеку задач в качестве модели актера. - person Rigorous implementation; 05.06.2012
comment
Дело в том, что вы НЕ используете актеров. BufferBlock не обрабатывает сообщения, в отличие от ActionBlock или TransformBlock. Кроме того, что вы имеете в виду под актерами? Вы пытаетесь преобразовать каждый узел в актера? Это огромная трата ресурсов. Было бы проще оценить функцию Contains для каждого узла в цикле Parallel.For, чтобы позволить среде выполнения выбрать приемлемое количество потоков. - person Panagiotis Kanavos; 05.06.2012
comment
Но у буферного блока есть функции отправки и получения, где я их использую... Да! Основная идея заключается в том, что каждый узел будет действовать как независимая система и будет взаимодействовать друг с другом. - person Rigorous implementation; 05.06.2012
comment
Так же как и ConcurrentQueue, ConcurrentBag, BlockingCollection и любой другой класс, используемый в сценариях Producer/Consumer, но они не называются акторами. Если вы хотите реализовать свой собственный класс акторов, BufferBlock совсем не поможет. Актеры реализованы как ActionBlocks или TransformBlocks. - person Panagiotis Kanavos; 05.06.2012
comment
Небольшая поправка: ActionBlock<T> не имеет очереди вывода, потому что он ничего не производит, в отличие от большинства других блоков. - person svick; 05.06.2012
comment
Кажется, я упомянул, что в основном ответе для возврата значения вам нужен TransformBlock - person Panagiotis Kanavos; 05.06.2012

Проблема в том, что поток должен владеть монитором, чтобы вызвать ожидание. Поэтому вам нужно заблокировать Monitor.PulseAll, а также Monitor.Wait, чтобы убедиться, что вы больше не получите подобных ошибок.

Если вам нужно, чтобы я объяснил вам блокировку, откройте еще один вопрос, и я объясню его полностью! :)

person Faraday    schedule 05.06.2012

РЕДАКТИРОВАТЬ: игнорировать мой пост - вместо этого прочитайте пост от @PanagiotisKanavos...

Это не скомпилируется, но направит вас в правильном направлении для использования блокировок:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;

public class DFS
{
    static List<string> traversedList = new List<string>();

    static List<string> parentList = new List<string>();
    static Thread[] thread_array;
    //static BufferBlock<Object> buffer1 = new BufferBlock<Object>();

    public static void Main(string[] args)
    {

        int N = 100;
        int M = N * 4;
        int P = N * 16;

        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();

        List<string> global_list = new List<string>();

        StreamReader file = new StreamReader(args[args.Length - 2]);


        string text = file.ReadToEnd();

        string[] lines = text.Split('\n');



        string[][] array1 = new string[lines.Length][];

        for (int i = 0; i < lines.Length; i++)
        {
            lines[i] = lines[i].Trim();
            string[] words = lines[i].Split(' ');

            array1[i] = new string[words.Length];

            for (int j = 0; j < words.Length; j++)
            {
                array1[i][j] = words[j];
            }
        }

        StreamWriter sr = new StreamWriter("E:\\Newtext1.txt");

        for (int i = 0; i < array1.Length; i++)
        {
            for (int j = 0; j < array1[i].Length; j++)
            {
                if (j != 0)
                {
                    sr.Write(array1[i][0] + ":" + array1[i][j]);
                    Console.WriteLine(array1[i][0] + ":" + array1[i][j]);
                    sr.Write(sr.NewLine);
                }
            }

        }
        int start_no = Convert.ToInt32(args[args.Length - 1]);
        thread_array = new Thread[lines.Length];
        string first_message = "root";
        //buffer1.Post(first_message);
        //buffer1.Post(array1);
        //buffer1.Post(start_no);
        //buffer1.Post(1);

        for (int t = 1; t < lines.Length; t++)
        {
            Console.WriteLine("thread" + t);
            thread_array[t] = new Thread(new ThreadStart(thread_run));
            thread_array[t].Name = t.ToString();
            lock (thread_array[t])
            {
                Console.WriteLine("working");
                thread_array[t].Start();
                thread_array[t].Join();
            }

        }
        stopwatch.Stop();

        Console.WriteLine(stopwatch.Elapsed);
        Console.ReadLine();
    }

    private static void dfs(string[][] array, int point)
    {
        for (int z = 1; z < array[point].Length; z++)
        {
            if ((!traversedList.Contains(array[point][z])))
            {
                traversedList.Add(array[point][z]);
                parentList.Add(point.ToString());
                dfs(array, int.Parse(array[point][z]));
            }

        }
        return;


    }

    bool busy;
    private readonly object syncLock = new object();

    public static void thread_run()
    {
        try
        {
            string parent;
            string[][] array1;
            int point;
            int id;
            //parent = (string)buffer1.Receive();
            //array1 = (string[][])buffer1.Receive();
            //point = (int)buffer1.Receive();
            //id = (int)buffer1.Receive();
            object value;
            Console.WriteLine("times");

            if (Thread.CurrentThread.Name.Equals("Point.ToString()"))
            {
                if (!traversedList.Contains("Point.ToString()"))
                {
                    for (int x = 1; x < 99999; x++)
                    {
                        Console.WriteLine("times");
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    array1 = (string[][])value;
                        //}
                        //if (buffer1.TryReceive(out value))
                        //{
                        //    id = (int)buffer1.Receive();
                        //}
                        //id++;
                        //buffer1.Post(parent);
                        //buffer1.Post(array1);
                        //buffer1.Post(x);
                        //buffer1.Post(id);
                        Console.WriteLine("times");

                        lock (syncLock)
                        {
                            while (busy)
                            {
                                busy = false;
                                Monitor.PulseAll(Thread.CurrentThread);
                            }
                            busy = true; // we've got it!
                        }


                    }

                    //return;
                }
                else
                {
                    //buffer1.Post(parent);
                    //buffer1.Post(array1);
                    //buffer1.Post(point);
                    //buffer1.Post(id);
                    lock (syncLock)
                    {
                        while (busy)
                        {
                            busy = false;
                            Monitor.PulseAll(Thread.CurrentThread);
                        }
                        busy = true; // we've got it!
                    }
                }
            }
            else
            {
                Console.WriteLine("working 2");
                lock (syncLock)
                {
                    while (busy)
                    {
                        Monitor.Wait(Thread.CurrentThread);
                    }
                    busy = true; // we've got it!
                }

            }
            //Console.WriteLine(parent);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }

    }

}
person Faraday    schedule 05.06.2012
comment
что я делаю в коде, так это то, что я создаю 10 разных потоков для вызова одной и той же функции. Вышеупомянутая идея, кажется, не работает. - person Rigorous implementation; 05.06.2012
comment
Если вы хотите использовать монитор, вы должны правильно заблокировать... Может быть, кто-то еще может пролить свет на это... - person Faraday; 05.06.2012
comment
Попробуйте изменить все ваши Monitor.Wait/Pulse/etc, чтобы вместо этого использовать syncLock. Проблема, с которой вы столкнулись, заключается в том, что вы блокируете поток и разблокируете сам поток... Читайте о потоках и шаблонах блокировки. Слишком много всего, чтобы уместиться в это поле из 500 символов, но попробуйте то, что я предложил, и оно будет работать лучше, чем вы думаете... Читайте здесь: albahari.com/threading/part4.aspx - person Faraday; 05.06.2012