Кадры данных медленно анализируют небольшой объем данных

У меня есть 2 класса, выполняющих аналогичную задачу в Apache Spark, но тот, который использует фрейм данных, во много раз медленнее, чем «обычный», использующий RDD. (30x)

Я хотел бы использовать фрейм данных, так как он устранит много кода и классов, которые у нас есть, но, очевидно, я не могу сделать так, чтобы он был намного медленнее.

Набор данных не является чем-то большим. У нас есть 30 файлов с данными JSON в каждом о событиях, вызванных действиями в другом программном обеспечении. В каждом файле содержится от 0 до 100 событий.

Для обработки набора данных с 82 событиями с кадрами данных потребуется около 5 минут.

Образец кода:

    public static void main(String[] args) throws ParseException, IOException {
  SparkConf sc = new SparkConf().setAppName("POC");
  JavaSparkContext jsc = new JavaSparkContext(sc);
  SQLContext sqlContext = new SQLContext(jsc);

  conf = new ConfImpl();

  HashSet<String> siteSet = new HashSet<>();

  // last month
  Date yesterday = monthDate(DateUtils.addDays(new Date(), -1)); // method that returns the date on the first of the month
  Date startTime = startofYear(new Date(yesterday.getTime())); // method that returns the date on the first of the year

  // list all the sites with a metric file
  JavaPairRDD<String, String> allMetricFiles = jsc.wholeTextFiles("hdfs:///somePath/*/poc.json");
  for ( Tuple2<String, String> each : allMetricFiles.toArray() ) {
    logger.info("Reading from " + each._1);
    DataFrame metric = sqlContext.read().format("json").load(each._1).cache();
    metric.count();
    boolean siteNameDisplayed = false;
    boolean dateDisplayed = false;

    do {
      Date endTime = DateUtils.addMonths(startTime, 1);
      HashSet<Row> totalUsersForThisMonth = new HashSet<>();
      for (String dataPoint : Conf.DataPoints) { // This is a String[] with 4 elements for this specific case
        try {
          if (siteNameDisplayed == false) {
            String siteName = parseSiteFromPath(each._1); // method returning a parsed String
            logger.info("Data for site: " + siteName);
            siteSet.add(siteName);
            siteNameDisplayed = true;
          }
          if ( dateDisplayed == false ) {
            logger.info("Month: " + formatDate(startTime)); // SimpleFormatDate("yyyy-MM-dd")
            dateDisplayed = true;
          }
          DataFrame lastMonth = metric.filter("event.eventId=\"" + dataPoint + "\"").filter("creationDate >= " + startTime.getTime()).filter("creationDate < " + endTime.getTime()).select("event.data.UserId").distinct();
          logger.info("Distinct for last month for " + dataPoint + ": " + lastMonth.count());
          totalUsersForThisMonth.addAll(lastMonth.collectAsList());
        } catch (Exception e) {
          // data does not fit the expected model so there is nothing to print
        }
      }
      logger.info("Total Unique for the month: " + totalStudentForThisMonth.size());
      startTime = DateUtils.addMonths(startTime, 1);
      dateDisplayed = false;
    } while ( startTime.getTime() < commonTmsMetric.monthDate(yesterday).getTime());

    // reset startTime for the next site
    startTime = commonTmsMetric.StartofYear(new Date(yesterday.getTime()));
  }
}

В этом коде есть несколько вещей, которые неэффективны, но когда я смотрю на журналы, это добавляет всего несколько секунд ко всей обработке.

Должно быть, я упускаю что-то большое.

Я запустил это с 2 исполнителями и 1 исполнителем, и разница составляет 20 секунд на 5 минут.

Это работает с Java 1.7 и Spark 1.4.1 на Hadoop 2.5.0.

Благодарю вас!


person mc5    schedule 26.08.2015    source источник


Ответы (1)


Итак, есть несколько вещей, но трудно сказать, не видя разбивки по различным задачам и их времени. Короткая версия заключается в том, что вы много работаете с драйвером и не используете преимущества распределенных возможностей Spark.

Например, вы собираете все данные обратно в программу-драйвер (toArray() и цикл for). Вместо этого вы должны просто указать Spark SQL на файлы, которые необходимо загрузить.

Для операторов кажется, что вы выполняете много агрегаций в драйвере, вместо этого вы можете использовать драйвер для создания агрегаций и выполнения их Spark SQL.

Еще одно большое различие между вашим внутренним кодом и кодом DataFrame заключается в выводе схемы. Поскольку вы уже создали классы для представления своих данных, вполне вероятно, что вы знаете схему своих данных JSON. Вероятно, вы можете ускорить свой код, добавив информацию о схеме во время чтения, чтобы Spark SQL мог пропустить вывод.

Я бы предложил вернуться к этому подходу и попытаться создать что-то, используя распределенные операторы Spark SQL.

person Holden    schedule 26.08.2015
comment
Есть много вещей, которые нужно решить, и я начну работать над ними, чтобы увидеть, как это улучшит производительность. Как я могу измерить стоимость вывода, о котором я бы предпочел, чтобы позаботился фрейм данных. Одной из целей перехода на фрейм данных было удаление множества классов, определяющих json. - person mc5; 27.08.2015