Поток данных TPL, запутавшийся в дизайне ядра

Я использую TPL Dataflow совсем немного, но спотыкаюсь о проблеме, которую не могу решить:

У меня есть следующая архитектура:

BroadCastBlock<List<object1>> -> 2 разных TransformBlock<List<Object1>, Tuple<int, List<Object1>>> -> оба ссылаются на TransformManyBlock<Tuple<int, List<Object1>>, Object2>

Я меняю лямбда-выражение в TransformManyBlock в конце цепочки: (а) код, выполняющий операции над потоковым кортежем, (б) вообще никакого кода.

В TransformBlocks я измеряю время, начиная с прибытия первого элемента и заканчивая остановкой, когда TransformBlock.Completion указывает на завершение блока (broadCastBlock ссылается на блоки преобразования с параметром propagateCompletion, установленным в true).

С чем я не могу смириться, так это с тем, почему в случае (б) блоки преобразования завершаются примерно в 5-6 раз быстрее, чем в случае (а). Это полностью противоречит замыслу всего проекта TDF. Элементы из блоков преобразования были переданы в блок transfromManyBlock, поэтому совершенно не должно иметь значения, что делает блок transformManyBlock с элементами, влияющими на завершение блоков преобразования. Я не вижу ни единой причины, по которой все, что происходит в блоке transfromManyBlock, может иметь отношение к предшествующим блокам TransformBlock.

Кто-нибудь может согласиться с этим странным наблюдением?

Вот некоторый код, чтобы показать разницу. При запуске кода обязательно измените следующие две строки:

        tfb1.transformBlock.LinkTo(transformManyBlock);
        tfb2.transformBlock.LinkTo(transformManyBlock);

to:

        tfb1.transformBlock.LinkTo(transformManyBlockEmpty);
        tfb2.transformBlock.LinkTo(transformManyBlockEmpty);

чтобы наблюдать разницу во времени выполнения предыдущих блоков преобразования.

class Program
{
    static void Main(string[] args)
    {
        Test test = new Test();
        test.Start();
    }
}

class Test
{
    private const int numberTransformBlocks = 2;
    private int currentGridPointer;
    private Dictionary<int, List<Tuple<int, List<Object1>>>> grid;

    private BroadcastBlock<List<Object1>> broadCastBlock;
    private TransformBlockClass tfb1;
    private TransformBlockClass tfb2;

    private TransformManyBlock<Tuple<int, List<Object1>>, Object2> 
               transformManyBlock;
    private TransformManyBlock<Tuple<int, List<Object1>>, Object2> 
               transformManyBlockEmpty;
    private ActionBlock<Object2> actionBlock;

    public Test()
    {
        grid = new Dictionary<int, List<Tuple<int, List<Object1>>>>();

        broadCastBlock = new BroadcastBlock<List<Object1>>(list => list);

        tfb1 = new TransformBlockClass();
        tfb2 = new TransformBlockClass();

        transformManyBlock = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>
                (newTuple =>
            {
                for (int counter = 1; counter <= 10000000;  counter++)
                {
                    double result = Math.Sqrt(counter + 1.0);
                }

                return new Object2[0];

            });

        transformManyBlockEmpty 
            = new TransformManyBlock<Tuple<int, List<Object1>>, Object2>(
                  tuple =>
            {
                return new Object2[0];
            });

        actionBlock = new ActionBlock<Object2>(list =>
            {
                int tester = 1;
                //flush transformManyBlock
            });

        //linking
        broadCastBlock.LinkTo(tfb1.transformBlock
                              , new DataflowLinkOptions 
                                  { PropagateCompletion = true }
                              );
        broadCastBlock.LinkTo(tfb2.transformBlock
                              , new DataflowLinkOptions 
                                  { PropagateCompletion = true }
                              );

        //link either to ->transformManyBlock or -> transformManyBlockEmpty
        tfb1.transformBlock.LinkTo(transformManyBlock);
        tfb2.transformBlock.LinkTo(transformManyBlock);

        transformManyBlock.LinkTo(actionBlock
                                  , new DataflowLinkOptions 
                                       { PropagateCompletion = true }
                                  );
        transformManyBlockEmpty.LinkTo(actionBlock
                                       , new DataflowLinkOptions 
                                            { PropagateCompletion = true }
                                       );

        //completion
        Task.WhenAll(tfb1.transformBlock.Completion
                     , tfb2.transformBlock.Completion)
                       .ContinueWith(_ =>
            {
                transformManyBlockEmpty.Complete();
                transformManyBlock.Complete();
            });

        transformManyBlock.Completion.ContinueWith(_ =>
            {
                Console.WriteLine("TransformManyBlock (with code) completed");
            });

        transformManyBlockEmpty.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("TransformManyBlock (empty) completed");
        });

    }

    public void Start()
    {
        const int numberBlocks = 100;
        const int collectionSize = 300000;


        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
            List<Object1> list = new List<Object1>();
            for (int j = 0; j < collectionSize; j++)
            {
                list.Add(new Object1(j));
            }

            broadCastBlock.Post(list);
        }

        //mark broadCastBlock complete
        broadCastBlock.Complete();

        Console.WriteLine("Core routine finished");
        Console.ReadLine();
    }
}

class TransformBlockClass
{
    private Stopwatch watch;
    private bool isStarted;
    private int currentIndex;

    public TransformBlock<List<Object1>, Tuple<int, List<Object1>>> transformBlock;

    public TransformBlockClass()
    {
        isStarted = false;
        watch = new Stopwatch();

        transformBlock = new TransformBlock<List<Object1>, Tuple<int, List<Object1>>>
           (list =>
        {
            if (!isStarted)
            {
                StartUp();
                isStarted = true;
            }

            return new Tuple<int, List<Object1>>(currentIndex++, list);
        });

        transformBlock.Completion.ContinueWith(_ =>
            {
                ShutDown();
            });
    }

    private void StartUp()
    {
        watch.Start();
    }

    private void ShutDown()
    {
        watch.Stop();

        Console.WriteLine("TransformBlock : Time elapsed in ms: " 
                              + watch.ElapsedMilliseconds);
    }
}

class Object1
{
    public int val { get; private set; }

    public Object1(int val)
    {
        this.val = val;
    }
}

class Object2
{
    public int value { get; private set; }
    public List<Object1> collection { get; private set; }

    public Object2(int value, List<Object1> collection)
    {
        this.value = value;
        this.collection = collection;
    }    
}

*EDIT: я опубликовал еще один фрагмент кода, на этот раз с использованием коллекций типов значений, и я не могу воспроизвести проблему, которую я наблюдаю в приведенном выше коде. Может ли быть так, что передача ссылочных типов и одновременная работа с ними (даже в пределах разных блоков потока данных) может блокировать и вызывать конфликты? *

class Program
{
    static void Main(string[] args)
    {
        Test test = new Test();
        test.Start();
    }
}

class Test
{
    private BroadcastBlock<List<int>> broadCastBlock;
    private TransformBlock<List<int>, List<int>> tfb11;
    private TransformBlock<List<int>, List<int>> tfb12;
    private TransformBlock<List<int>, List<int>> tfb21;
    private TransformBlock<List<int>, List<int>> tfb22;
    private TransformManyBlock<List<int>, List<int>> transformManyBlock1;
    private TransformManyBlock<List<int>, List<int>> transformManyBlock2;
    private ActionBlock<List<int>> actionBlock1;
    private ActionBlock<List<int>> actionBlock2;

    public Test()
    {
        broadCastBlock = new BroadcastBlock<List<int>>(item => item);

        tfb11 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb12 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb21 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        tfb22 = new TransformBlock<List<int>, List<int>>(item =>
            {
                return item;
            });

        transformManyBlock1 = new TransformManyBlock<List<int>, List<int>>(item =>
            {
                Thread.Sleep(100);
                //or you can replace the Thread.Sleep(100) with actual work, 
                //no difference in results. This shows that the issue at hand is 
                //unrelated to starvation of threads.

                return new List<int>[1] { item };
            });

        transformManyBlock2 = new TransformManyBlock<List<int>, List<int>>(item =>
            {
                return new List<int>[1] { item };
            });

        actionBlock1 = new ActionBlock<List<int>>(item =>
            {
                //flush transformManyBlock
            });

        actionBlock2 = new ActionBlock<List<int>>(item =>
        {
            //flush transformManyBlock
        });

        //linking
        broadCastBlock.LinkTo(tfb11, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb12, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb21, new DataflowLinkOptions 
                                      { PropagateCompletion = true });
        broadCastBlock.LinkTo(tfb22, new DataflowLinkOptions 
                                      { PropagateCompletion = true });

        tfb11.LinkTo(transformManyBlock1);
        tfb12.LinkTo(transformManyBlock1);
        tfb21.LinkTo(transformManyBlock2);
        tfb22.LinkTo(transformManyBlock2);

        transformManyBlock1.LinkTo(actionBlock1
                                   , new DataflowLinkOptions 
                                     { PropagateCompletion = true }
                                   );
        transformManyBlock2.LinkTo(actionBlock2
                                   , new DataflowLinkOptions 
                                     { PropagateCompletion = true }
                                   );

        //completion
        Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
            {
                Console.WriteLine("TransformBlocks 11 and 12 completed");
                transformManyBlock1.Complete();
            });

        Task.WhenAll(tfb21.Completion, tfb22.Completion).ContinueWith(_ =>
            {
                Console.WriteLine("TransformBlocks 21 and 22 completed");
                transformManyBlock2.Complete();
            });

        transformManyBlock1.Completion.ContinueWith(_ =>
            {
                Console.WriteLine
                    ("TransformManyBlock (from tfb11 and tfb12) finished");
            });

        transformManyBlock2.Completion.ContinueWith(_ =>
            {
                Console.WriteLine
                    ("TransformManyBlock (from tfb21 and tfb22) finished");
            });
    }

    public void Start()
    {
        const int numberBlocks = 100;
        const int collectionSize = 300000;

        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
            List<int> list = new List<int>();
            for (int j = 0; j < collectionSize; j++)
            {
                list.Add(j);
            }

            broadCastBlock.Post(list);
        }

        //mark broadCastBlock complete
        broadCastBlock.Complete();

        Console.WriteLine("Core routine finished");
        Console.ReadLine();
    }
}

person Matt    schedule 12.12.2012    source источник
comment
Спасибо, сейчас посмотрю.   -  person casperOne    schedule 13.12.2012
comment
@casperOne, я добавил еще один фрагмент кода, на этот раз с использованием коллекций типов значений, может ли быть так, что одновременный доступ к типам ссылок (даже в разных блоках потока данных) вызывает блокировку и, следовательно, конкуренцию, которая может отвечать за задержки даже в блоках данных которые находятся выше в пищевой цепочке?   -  person Matt    schedule 14.12.2012
comment
Аргх, не могу проверить. Не будет доступа к потоку данных TPL не раньше понедельника (машина с VS.NET 2012 вышла из строя).... Я пытаюсь!   -  person casperOne    schedule 14.12.2012
comment
@casperOne, не беспокойтесь, я забыл добавить, что второй фрагмент кода приводит к тому, что оба transformManyBlocks завершаются примерно в одно и то же время, независимо от того, сколько работы выполняется в каждом блоке. Я не вижу различий между первой кодовой базой и второй, что может объяснить задержку завершения задачи в первой кодовой базе.   -  person Matt    schedule 16.12.2012


Ответы (1)


Ладно, последняя попытка ;-)

Сводка:

Наблюдаемую разницу во времени в сценарии 1 можно полностью объяснить различным поведением сборщика мусора.

При выполнении сценария 1, связывающего transformManyBlocks, поведение во время выполнения таково, что сборка мусора запускается во время создания новых элементов (списков) в основном потоке, чего не происходит при запуске сценария 1 со связанными transformManyBlockEmpty.

Обратите внимание, что создание нового экземпляра ссылочного типа (Object1) приводит к вызову выделения памяти в куче GC, что, в свою очередь, может инициировать запуск сбора GC. Поскольку создается довольно много экземпляров Object1 (и списков), у сборщика мусора появляется немного больше работы по сканированию кучи на наличие (потенциально) недостижимых объектов.

Следовательно, наблюдаемая разница может быть минимизирована любым из следующих способов:

  • Превращение Object1 из класса в структуру (таким образом гарантируя, что память для экземпляров не будет выделена в куче).
  • Сохранение ссылки на сгенерированные списки (тем самым сокращая время, необходимое сборщику мусора для выявления недоступных объектов).
  • Генерация всех элементов перед их размещением в сети.

(Примечание: я не могу объяснить, почему сборщик мусора ведет себя по-разному в сценарии 1 «transformManyBlock» по сравнению со сценарием 1 «transformManyBlockEmpty», но данные, собранные с помощью ConcurrencyVisualizer, ясно показывают разницу.)

Результаты:

(Тесты проводились на Core i7 980X, 6 ядер, включен HT):

Я изменил сценарий 2 следующим образом:

// Start a stopwatch per tfb
int tfb11Cnt = 0;
Stopwatch sw11 = new Stopwatch();
tfb11 = new TransformBlock<List<int>, List<int>>(item =>
{
    if (Interlocked.CompareExchange(ref tfb11Cnt, 1, 0) == 0)
        sw11.Start();

    return item;
});

// [...]

// completion
Task.WhenAll(tfb11.Completion, tfb12.Completion).ContinueWith(_ =>
{

     Console.WriteLine("TransformBlocks 11 and 12 completed. SW11: {0}, SW12: {1}",
     sw11.ElapsedMilliseconds, sw12.ElapsedMilliseconds);
     transformManyBlock1.Complete();
});

Результаты:

  1. Сценарий 1 (как опубликовано, т. е. связано с transformManyBlock):
    TransformBlock : истекшее время в мс: 6826
    TransformBlock : истекшее время в мс: 6826
  2. Сценарий 1 (связан с transformManyBlockEmpty):
    TransformBlock : истекшее время в мс: 3140
    TransformBlock : истекшее время в мс: 3140
  3. Сценарий 1 (transformManyBlock, Thread.Sleep(200) в теле цикла):
    TransformBlock : истекшее время в мс: 4949
    TransformBlock : истекшее время в мс: 4950
  4. Сценарий 2 (опубликовано, но изменено, чтобы сообщить время):
    TransformBlocks 21 и 22 завершены. SW21: 619 мс, SW22: 669 мс
    Блоки преобразования 11 и 12 завершены. SW11: 669 мс, SW12: 667 мс

Затем я изменил сценарий 1 и 2, чтобы подготовить входные данные перед их публикацией в сети:

// Scenario 1
//send collection numberBlock-times
var input = new List<List<Object1>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
    var list = new List<Object1>(collectionSize);
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(new Object1(j));
    }
    input.Add(list);
}

foreach (var inp in input)
{
    broadCastBlock.Post(inp);
    Thread.Sleep(10);
}

// Scenario 2
//send collection numberBlock-times
var input = new List<List<int>>(numberBlocks);
for (int i = 0; i < numberBlocks; i++)
{
    List<int> list = new List<int>(collectionSize);
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(j);
    }

    //broadCastBlock.Post(list);
    input.Add(list);
 }

 foreach (var inp in input)
 {
     broadCastBlock.Post(inp);
     Thread.Sleep(10);
 }

Результаты:

  1. Сценарий 1 (transformManyBlock):
    TransformBlock : истекшее время в мс: 1029
    TransformBlock : истекшее время в мс: 1029
  2. Сценарий 1 (transformManyBlockEmpty):
    TransformBlock : истекшее время в мс: 975
    TransformBlock : истекшее время в мс: 975
  3. Сценарий 1 (transformManyBlock, Thread.Sleep(200) в теле цикла):
    TransformBlock : истекшее время в мс: 972
    TransformBlock : истекшее время в мс: 972

Наконец, я изменил код обратно на исходную версию, но сохранил ссылку на созданный список:

var lists = new List<List<Object1>>();
for (int i = 0; i < numberBlocks; i++)
{
    List<Object1> list = new List<Object1>();
    for (int j = 0; j < collectionSize; j++)
    {
        list.Add(new Object1(j));
    }
    lists.Add(list);                
    broadCastBlock.Post(list);
}

Результаты:

  1. Сценарий 1 (transformManyBlock):
    TransformBlock : истекшее время в мс: 6052
    TransformBlock : истекшее время в мс: 6052
  2. Сценарий 1 (transformManyBlockEmpty):
    TransformBlock : истекшее время в мс: 5524
    TransformBlock : истекшее время в мс: 5524
  3. Сценарий 1 (transformManyBlock, Thread.Sleep(200) в теле цикла):
    TransformBlock : истекшее время в мс: 5098
    TransformBlock : истекшее время в мс: 5098

Аналогично, изменение Object1 с класса на структуру приводит к тому, что оба блока завершаются примерно в одно и то же время (и примерно в 10 раз быстрее).


Обновление: приведенного ниже ответа недостаточно для объяснения наблюдаемого поведения.

В первом сценарии внутри лямбда-выражения TransformMany выполняется замкнутый цикл, который загружает ЦП и лишает другие потоки ресурсов процессора. По этой причине может наблюдаться задержка выполнения задачи продолжения завершения. Во втором сценарии Thread.Sleep выполняется внутри лямбды TransformMany, что дает другим потокам возможность выполнить задачу продолжения завершения. Наблюдаемая разница в поведении во время выполнения не связана с потоком данных TPL. Чтобы улучшить наблюдаемые дельты, достаточно ввести Thread.Sleep внутри тела цикла в сценарии 1:

for (int counter = 1; counter <= 10000000;  counter++)
{
   double result = Math.Sqrt(counter + 1.0);
   // Back off for a little while
   Thread.Sleep(200);
}

(Ниже приведен мой первоначальный ответ. Я недостаточно внимательно прочитал вопрос ОП и понял, о чем он спрашивал, только прочитав его комментарии. Я все еще оставляю его здесь в качестве ссылки.)

Вы уверены, что правильно измеряете? Обратите внимание, что когда вы делаете что-то вроде этого: transformBlock.Completion.ContinueWith(_ => ShutDown());, ваше измерение времени будет зависеть от поведения TaskScheduler (например, сколько времени потребуется, пока задача продолжения не начнет выполняться). Хотя я не смог наблюдать разницу, которую вы видели на моей машине, я получил более точные результаты (с точки зрения разницы между временем завершения tfb1 и tfb2) при использовании выделенных потоков для измерения времени:

       // Within your Test.Start() method...
       Thread timewatch = new Thread(() =>
       {
           var sw = Stopwatch.StartNew();
           tfb1.transformBlock.Completion.Wait();
           Console.WriteLine("tfb1.transformBlock completed within {0} ms",
                              sw.ElapsedMilliseconds);
        });

        Thread timewatchempty = new Thread(() =>
        {
            var sw = Stopwatch.StartNew();
            tfb2.transformBlock.Completion.Wait();
            Console.WriteLine("tfb2.transformBlock completed within {0} ms", 
                               sw.ElapsedMilliseconds);
        });

        timewatch.Start();
        timewatchempty.Start();

        //send collection numberBlock-times
        for (int i = 0; i < numberBlocks; i++)
        {
          // ... rest of the code
person afrischke    schedule 20.12.2012
comment
Я не могу использовать выделенные задачи, потому что, в конце концов, будет намного больше блоков преобразования, а использование выделенных задач приведет к голоданию ЦП. Я знаю о проблемах со временем, которые вы подняли, и о планировщике задач, однако меня именно это интересует, разница между завершением tfb1 и tfb2. Результаты согласуются, поэтому я могу с уверенностью сказать, что поведение блоков transfromMany в кодовой базе 1 ВЛИЯЕТ на время завершения предшествующих им блоков преобразования. Таким образом, я ценю ваш код, но на самом деле он не помогает решить мою проблему. - person Matt; 20.12.2012
comment
Также обратите внимание, что в кодовой базе 2, которая полностью состоит из типов значений, не наблюдается дельты. Таким образом, это не проблема планировщика задач или плохих способов измерения времени выполнения. - person Matt; 20.12.2012
comment
Ах, так вы хотите знать, когда задачи продолжения после завершения блоков начинают выполняться? Но это не имеет прямого отношения к потоку данных TPL. Кстати, я протестировал только первый пример. - person afrischke; 20.12.2012
comment
Нет, я хочу знать, почему transformBlocks в кодовой базе 1 завершаются в совершенно разное время в зависимости от накладных расходов в transformManyBlock, в то время как они завершаются в одно и то же время в кодовой базе 2. Вопрос: Почему работа в transformManyBlock влияет на время завершения transformBlocks, и почему только в кодовой базе 1, но не в кодовой базе 2. Это вопрос, напрямую связанный с потоком данных TPL. - person Matt; 20.12.2012
comment
Та же причина: ваш код перегружает процессор(ы), выполняя узкий цикл for. В промежутках планировщик потоков решает, что одна из запланированных задач завершения должна быть запущена, а затем предоставляет занятым потокам, которые выполняют цикл, больше процессорного времени. Затем несколько раз спустя он решает, что должны выполняться другие запланированные задачи завершения, что он и делает. С другой стороны, в сценарии 2 ничего не происходит, потоки, выполняющие лямбда-выражение TransformMany, просто спят, поэтому планировщик потоков решает, что задачи завершения могут выполняться немедленно. - person afrischke; 20.12.2012
comment
Хм, я сомневаюсь, что это проблема, для сценария II я ранее позволял transformManyBlock работать вместо того, чтобы спать, и никакой разницы. Вы можете попробовать это сами, просто заменив Thread.Sleep(x) чем-то простым, что заставит ЦП быть загруженным. Вы увидите, что все блоки transformBlock завершаются в одно и то же время. - person Matt; 20.12.2012
comment
Без проблем. Просто попробуйте первый пример с засыпанием в цикле (или замените весь цикл засыпанием, как во втором примере) и посмотрите, изменит ли это наблюдаемое поведение или нет. Если это так, то, скорее всего, мое объяснение верно, если нет, то вы доказали, что я ошибаюсь. - person afrischke; 20.12.2012
comment
ваш ответ определенно неверен в том виде, в каком он у вас есть сейчас. Я уже говорил вам, что вы можете заменить Thread.Sleep реальной работой, и вы увидите, что изменений не будет. Ваш аргумент о голодающих тредах здесь не применим. - person Matt; 21.12.2012
comment
Большое спасибо за ваш подробный ответ, и мы все еще работаем над его обновлением. Итак, если я правильно понимаю, вы говорите, что проблема не в том, сколько работы выполняется в TransformManyBlock, верно? Затем ваши результаты (когда вы завершаете подготовку входных элементов перед их отправкой в ​​BroadCastBlock) немного смущают меня, поскольку я не понимаю, почему более медленная отправка элементов (с задержкой в ​​​​10 мс) внезапно приведет к тому, что оба transformBlocks завершатся примерно в одно и то же время. . Не знаю, какие выводы из этого сделать. Любые подсказки или предложения? - person Matt; 21.12.2012
comment
... Я говорю, что, поскольку оба блока transformBlock получают одни и те же элементы примерно в одно и то же время, оба выполняют в этом примере идентичную работу. Почему в дельте завершения есть разница, отправляю ли я элементы быстрее или медленнее? Все, что это говорит мне прямо сейчас, это то, что часть накладных расходов приходится на подготовку элементов (о чем я знал раньше), но почему дельта при создании элементов на лету? В замешательстве (о результате)... - person Matt; 21.12.2012
comment
Я по-прежнему начисляю баллы за ваш ответ, потому что вы действительно приложили усилия, чтобы расследовать это. Извините, если я временами казался грубым или слишком прямым. Это не было моим намерением. Мне все еще любопытны результаты, которые вы нашли, а также дальнейшее профилирование кода, я был бы признателен, если бы вы могли поделиться какими-либо дополнительными выводами или помочь мне лучше понять ваши последние результаты. - person Matt; 21.12.2012
comment
давайте продолжим это обсуждение в чате - person afrischke; 22.12.2012