Сколько обычно накладных расходов при распределении обработки?

Для нетерпеливых читателей: это незавершенная работа, в процессе которой я прошу помощи. Пожалуйста, не судите об инструментах по моим временным данным, так как они могут измениться, пока я пытаюсь добиться лучших результатов.

Мы находимся в середине процесса принятия решения об архитектуре инструмента для анализа результатов совместного моделирования.

В рамках этого процесса меня попросили написать инструмент тестирования и получить данные о скоростях нескольких фреймворков распределенной обработки.

Я тестировал следующие фреймворки: Apache Spark, Apache Flink, Hazelcast Jet. И в качестве сравнения базовой простой Java.

Тестовый пример, который я использовал, был простым: «Вот список Pojos с одним полем в pojo с двойным значением. Найдите наименьшее (минимальное) значение».

Простой, понятный и, надеюсь, очень сопоставимый.

Три из четырех тестов используют простой компаратор, четвертый (мигающий) использует редуктор, который в основном идентичен компараторам. Функции анализа выглядят так:

Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();

Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();

Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();

Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();

Я тщательно протестировал это, варьируя размер списка тестов, а также выделенные ресурсы. И результаты поразили меня. ЛУЧШИЕ результаты можно увидеть ниже (все числа в миллисекундах, 1 миллион pojos, 10 тестов каждое):

  • экземпляры: сколько времени потребовалось, чтобы объявить и запустить экземпляр фреймворков
  • list: сколько времени потребовалось, чтобы разобрать / передать список в "список" фреймворков
  • процесс: сколько времени потребовалось для обработки данных для получения минимального
  • в целом: от начала до конца каждого теста

Исход:

java:
Instances: 
List: 
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16, 
Overall: 111, 24, 16, 17, 16, 16, 16, 16, 16, 16, 

spark:
Instances: 2065, 89, 62, 69, 58, 49, 56, 47, 41, 52, 
List: 166, 5, 1, 1, 2, 1, 0, 0, 0, 0, 
Process: 2668, 2768, 1936, 2016, 1950, 1936, 2105, 2674, 1913, 1882, 
Overall: 4943, 2871, 2011, 2094, 2020, 1998, 2172, 2728, 1961, 1943, 

hazel:
Instances: 6347, 2891, 2817, 3106, 2636, 2936, 3018, 2969, 2622, 2799, 
List: 1984, 1656, 1470, 1505, 1524, 1429, 1512, 1445, 1394, 1427, 
Process: 4348, 3809, 3655, 3751, 3927, 3887, 3592, 3810, 3673, 3769, 
Overall: 12850, 8373, 7959, 8384, 8110, 8265, 8133, 8239, 7701, 8007

flink:
Instances: 45, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
List: 92, 35, 16, 13, 17, 15, 19, 11, 19, 24, 
Process: 34292, 20822, 20870, 19268, 17780, 17390, 17124, 19628, 17487, 18586, 
Overall: 34435, 20857, 20886, 19281, 17797, 17405, 17143, 19639, 17506, 18610, 

Самые интересные части:

  • лучшие результаты ВСЕ получены из чисто локальных тестов (один экземпляр)
  • любые тесты, в которых использовалась распределенная механика (дополнительные узлы и т. д.), были на порядок медленнее (например, искра в 2,5 раза медленнее при распределении).

Не поймите меня неправильно, основная логика состоит в том, что распределенная обработка должна быть медленнее на ядро, чем однопоточная обработка.

Но на 2 порядка ДАЖЕ если использовать на монопотоке? И на 3 порядка, если раздадут? Может ли кто-нибудь увидеть ошибку, которую я, по-видимому, допустил во всех трех распределенных процессах? Я ожидал некоторого коэффициента <10, поэтому было бы неплохо убить его дополнительным оборудованием.

Так есть ли способ уменьшить накладные расходы этих фреймворков до, хм, может быть, до x9 вместо x999?

Я знаю, что знаю, что тестовые данные, которые я использую, слишком малы, но даже если увеличить их, я не заметил никакого снижения накладных расходов по сравнению с производительностью. И это примерно размер пакетов данных, которые нам нужно проанализировать (0,1–1 млн объектов в секунду на симуляцию). Так что ваша помощь в поиске моей ошибки приветствуется. : D

ОБНОВЛЕНИЕ Spark:

После более тщательного тестирования Spark я все еще не впечатлен. Настройка была следующей:

java-клиент на одной машине в 64-ядерном, 480 ГБ RAM мастер задания и 7 подчиненных устройств на отдельной стойке, 32 ядра по 20 ГБ каждый

    1 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 622, 448, 68, 45, 22, 32, 15, 27, 22, 29, 
    spark:
      Instances: 4865, 186, 160, 133, 121, 112, 106, 78, 121, 106, 
      List: 310, 2, 2, 1, 2, 4, 2, 1, 2, 1, 
      Process: 8190, 4433, 4200, 4073, 4201, 4092, 3822, 3852, 3921, 4051, 

    10 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 2329, 144, 50, 65, 75, 70, 69, 66, 66, 66, 
    spark:
      Instances: 20345, 
      List: 258, 2, 1, 1, 1, 4, 1, 1, 1, 1, 
      Process: 55671, 49629, 48612, 48090, 47897, 47857, 48319, 48274, 48199, 47516

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 1+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 748, 376, 70, 31, 69, 64, 46, 17, 50, 53, 
    spark:
      Instances: 4631, 
      List: 249, 1, 2, 2, 3, 3, 1, 1, 2, 1, 
      Process: 12273, 7471, 6314, 6083, 6228, 6158, 5990, 5953, 5981, 5972

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 820, 494, 66, 29, 5, 30, 29, 43, 45, 21, 
    spark:
      Instances: 4513, 
      List: 254, 2, 2, 2, 2, 4, 2, 2, 1, 1, 
      Process: 17007, 6545, 7174, 7040, 6356, 6502, 6482, 6348, 7067, 6335

    10 mio objects, 52k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java Process: 3037, 78, 48, 45, 53, 73, 72, 73, 74, 64, 
    spark:
      Instances: 20181, 
      List: 264, 3, 2, 2, 1, 4, 2, 2, 1, 1, 
      Process: 77830, 67563, 65389, 63321, 61416, 63007, 64760, 63341, 63440, 65320

    1 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i =0 to 100
    java Process: 722, 631, 62, 26, 25, 42, 26, 11, 12, 29, 40, 16, 14, 23, 29, 18, 14, 11, 71, 76, 37, 52, 32, 15, 51, 54, 19, 74, 62, 54, 7, 60, 37, 54, 42, 3, 7, 60, 33, 44, 50, 50, 39, 34, 34, 13, 47, 63, 46, 4, 52, 20, 19, 24, 6, 53, 4, 3, 68, 10, 59, 52, 48, 3, 48, 37, 5, 38, 10, 47, 4, 53, 36, 41, 31, 57, 7, 64, 45, 33, 14, 53, 5, 41, 40, 48, 4, 60, 49, 37, 20, 34, 53, 4, 58, 36, 12, 35, 35, 4, 
    spark:
      Instances: 4612, 
      List: 279, 3, 2, 1, 2, 5, 3, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 
      Process: 16300, 6577, 5802, 6136, 5389, 5912, 5885, 6157, 5440, 6199, 5902, 6299, 5919, 6066, 5803, 6612, 6120, 6775, 6585, 6146, 6860, 6955, 6661, 6819, 6868, 6700, 7140, 7532, 7077, 7180, 7360, 7526, 7770, 7877, 8048, 7678, 8260, 8131, 7837, 7526, 8261, 8404, 8431, 8340, 9000, 8825, 8624, 9340, 9418, 8677, 8480, 8678, 9003, 9036, 8912, 9235, 9401, 9577, 9808, 9485, 9955, 10029, 9506, 9387, 9794, 9998, 9580, 9963, 9273, 9411, 10113, 10004, 10369, 9880, 10532, 10815, 11039, 10717, 11251, 11475, 10854, 11468, 11530, 11488, 11077, 11245, 10936, 11274, 11233, 11409, 11527, 11897, 11743, 11786, 11086, 11782, 12001, 11795, 12075, 12422

    2 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 0 to 30
    java Process: 1759, 82, 31, 18, 30, 41, 47, 28, 27, 13, 28, 46, 5, 72, 50, 81, 66, 44, 36, 72, 44, 11, 65, 67, 58, 47, 54, 60, 46, 34, 
    spark:
      Instances: 6316, 
      List: 265, 3, 3, 2, 2, 6, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 1, 1, 5, 1, 1, 1, 1, 2, 1, 1, 1, 
      Process: 24084, 13041, 11451, 11274, 10919, 10972, 10677, 11048, 10659, 10984, 10820, 11057, 11355, 10874, 10896, 11725, 11580, 11149, 11823, 11799, 12414, 11265, 11617, 11762, 11561, 12443, 12448, 11809, 11928, 12095

    10 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 5 to 30
    java Process: 1753, 91, 57, 71, 86, 86, 151, 80, 85, 72, 61, 78, 80, 87, 93, 89, 70, 83, 166, 84, 87, 94, 90, 88, 92, 89, 196, 96, 97, 89, 
    spark:
      Instances: 21192, 
      List: 282, 3, 2, 2, 3, 4, 2, 2, 1, 0, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1, 
      Process: 60552, 53960, 53166, 54971, 52827, 54196, 51153, 52626, 54138, 51134, 52427, 53618, 50815, 50807, 52398, 54315, 54411, 51176, 53843, 54736, 55313, 56267, 50837, 54996, 52230, 52845

Результаты: независимо от того, сколько оборудования было загружено на него и как кластеризовались задачи, при использовании Spark на каждый миллион pojos в списке приходилось 5-6 секунд.

С другой стороны, Java справилась с той же суммой за 5–30 мс. Так что в основном множитель 200–1000.

Есть ли у кого-нибудь предложения, как "разогнать" Spark для такой несложной работы?

ОБНОВЛЕНИЕ Hazel:

Теперь я начинаю впечатляться. Хотя я все еще борюсь с некоторыми странными проблемами, по крайней мере, Hazelcast Jet, похоже, понимает, что локальные данные могут обрабатываться локально, если это возможно. Только 100% (коэффициент x2) накладных расходов, что вполне приемлемо.

10 млн объектов

java:
   Instances: 
   List: 68987, 
   Process: 2288, 99, 54, 52, 54, 64, 89, 83, 79, 88, 
hazel:
  Instances: 6136, 
  List: 97225, 
  Process: 1112, 375, 131, 123, 148, 131, 137, 119, 176, 140

ОБНОВЛЕНИЕ Flink:

На данный момент исключил его из тестирования, так как он вызывает слишком много проблем, но не дает хороших результатов.

РЕДАКТИРОВАТЬ: весь тест можно найти в разделе: https://github.com/anderschbe/clusterbench

В настройке кластера для Spark используется spark-2.1.0-bin-hadoop2.7, поскольку он поставляется из коробки. С одним незначительным изменением в spark_env.sh: SPARK_NO_DAEMONIZE = true

единственное изменение, необходимое для его работы в кластере, - это замена «localhost» в строке 25 SparcProc на «spark: //I_cant_give_you_my_cluster_IP.doo»


person Anders Bernard    schedule 13.03.2017    source источник
comment
Есть ли способ поделиться своим кодом? Например, в частном репозитории Github.   -  person Neil Stevenson    schedule 13.03.2017
comment
Я прикреплю сюда самые важные части. Честно говоря, это просто базовые вещи, прямо из руководств по фреймворкам.   -  person Anders Bernard    schedule 13.03.2017
comment
@AndersBernard В мой ответ добавлено важное изменение - вы используете local, что означает только один рабочий поток   -  person T. Gawęda    schedule 13.03.2017
comment
кивнуть Как сказано, это самый быстрый. Конечно, я тестировал и нелокальные ... и это даже хуже. Однако я не публикую данные о нашем внутреннем кластере. ;)   -  person Anders Bernard    schedule 13.03.2017
comment
Может быть, это было недостаточно ясно;) Пожалуйста, предупредите меня, когда у вас будет настройка кластера;)   -  person T. Gawęda    schedule 13.03.2017
comment
Хорошо, загрузил тест на github после очистки от всех конфиденциальных данных. К сожалению, сюда входят все данные кластера ... но он действительно работает на нескольких ядрах кластера.   -  person Anders Bernard    schedule 13.03.2017
comment
Hazelcast List не является распространяемым источником - вместо этого вам следует использовать карту. Мне было бы любопытно узнать, как это повлияет на ваши результаты.   -  person Can Gencer    schedule 13.03.2017
comment
Если ваши данные происходят на одном узле, в первую очередь следует учитывать стоимость их распространения по сети по сравнению с затратами на вычисления. Поскольку в вашем тесте используется простая функция min, это на несколько порядков дешевле, чем задержка в сети. Если окажется, что ваши фактические вычисления по-прежнему дешевле, чем накладные расходы сети, вы не получите выгоды от механизма распределенных вычислений.   -  person Marko Topolnik    schedule 15.03.2017


Ответы (1)


Когда вы что-то рассчитываете в кластерной структуре, например Spark или Flink, framework:

  • сериализует ваш код
  • отправить запрос ресурса
  • отправьте свой код по сети
  • выполнение графика
  • ждать результата

Как видите, выполняется много шагов - не только ваш расчет! Распределенные вычисления имеют смысл, если вы:

  • можете разделить ваш расчет на небольшие задачи, которые можно выполнять параллельно
  • слишком много данных для обработки на одной машине, или обработка на одной машине может быть слишком медленной - дисковый ввод-вывод, некоторые другие специфические факторы в расчетах проекта ИЛИ очень специфичны и требуют много процессоров, как правило, более чем на одной машине - но тогда расчет одной части данных должен быть очень длинным

Попробуйте вычислить максимальное количество слов в текстовом файле размером 10 ГБ - тогда Spark и Flink превзойдут одноузловую Java

Иногда пользовательский код может вызывать замедление распределенных вычислений. Типичные ошибки:

  • пользователь записывает лямбды в классах со многими ссылками - все остальные классы сериализованы, сериализация занимает много времени
  • задачи на самом деле не параллельны - они должны ждать друг друга или должны обрабатывать большую часть данных
  • асимметрия данных - объекты могут иметь неправильную hashCode реализацию и HashPartitioner приводят к тому, что все данные попадают в один раздел = один узел
  • неправильное количество разделов - вы можете добавить еще 1000 машин, но если у вас все еще есть 4 раздела, вы можете архивировать не более 4 параллельных задач за один раз
  • слишком много сетевого взаимодействия - в вашем случае это не проблема, но иногда пользователь делает много join и reduce

РЕДАКТИРОВАТЬ после редактирования вопроса: в вашем примере Spark работает на local, что означает только 1 поток! Используйте как минимум local[*] или другой менеджер кластера. У вас есть накладные расходы, перечисленные в этом ответе, и только один поток

person T. Gawęda    schedule 13.03.2017
comment
кивает Я все это понимаю. Я просто недоумеваю, насколько велики накладные расходы. Как уже говорилось, нам, возможно, придется обрабатывать данные, которых может быть слишком много для одной машины ... но мы не можем заплатить за этот факт увеличением требований к оборудованию в 1000 раз. Коэффициент 10 будет приемлемым. - person Anders Bernard; 13.03.2017
comment
@AndersBernard Распределенные вычисления подходят не для всех случаев использования. Через минуту я опубликую подробности, почему Spark может работать медленнее, подождите секунду;) - person T. Gawęda; 13.03.2017
comment
кивнул тоже понял. Хотя пока я тестирую с помощью банкомата с одним пакетом, нужно анализировать только подготовку к непрерывному потоку таких пакетов. Десятки из них в секунду. - person Anders Bernard; 13.03.2017
comment
Десятки? Миллионы - хорошо, тысячи - хорошо, но десятки - это довольно мало;) Запускайте тесты с гораздо большим объемом данных. Если это не даст лучших результатов, опубликуйте свою конфигурацию, возможно, у вас есть ошибка в вашей конфигурации - person T. Gawęda; 13.03.2017
comment
Ну, мы говорим об 1-15 пакетах на симуляцию (10-100 симуляций) в секунду. Каждый пакет состоит из 10-200 тыс. Объектов, каждый из которых имеет примерно 120 байт. Таким образом, нижний счет будет в общей сложности от 12 Мбайт до 36 Гбайт в секунду. - person Anders Bernard; 13.03.2017
comment
@AndersBernard Пожалуйста, опубликуйте свою конфигурацию и код - возможно, есть какая-то конкретная ошибка - person T. Gawęda; 13.03.2017