У меня есть 2 класса, выполняющих аналогичную задачу в Apache Spark, но тот, который использует фрейм данных, во много раз медленнее, чем «обычный», использующий RDD. (30x)
Я хотел бы использовать фрейм данных, так как он устранит много кода и классов, которые у нас есть, но, очевидно, я не могу сделать так, чтобы он был намного медленнее.
Набор данных не является чем-то большим. У нас есть 30 файлов с данными JSON в каждом о событиях, вызванных действиями в другом программном обеспечении. В каждом файле содержится от 0 до 100 событий.
Для обработки набора данных с 82 событиями с кадрами данных потребуется около 5 минут.
Образец кода:
public static void main(String[] args) throws ParseException, IOException {
SparkConf sc = new SparkConf().setAppName("POC");
JavaSparkContext jsc = new JavaSparkContext(sc);
SQLContext sqlContext = new SQLContext(jsc);
conf = new ConfImpl();
HashSet<String> siteSet = new HashSet<>();
// last month
Date yesterday = monthDate(DateUtils.addDays(new Date(), -1)); // method that returns the date on the first of the month
Date startTime = startofYear(new Date(yesterday.getTime())); // method that returns the date on the first of the year
// list all the sites with a metric file
JavaPairRDD<String, String> allMetricFiles = jsc.wholeTextFiles("hdfs:///somePath/*/poc.json");
for ( Tuple2<String, String> each : allMetricFiles.toArray() ) {
logger.info("Reading from " + each._1);
DataFrame metric = sqlContext.read().format("json").load(each._1).cache();
metric.count();
boolean siteNameDisplayed = false;
boolean dateDisplayed = false;
do {
Date endTime = DateUtils.addMonths(startTime, 1);
HashSet<Row> totalUsersForThisMonth = new HashSet<>();
for (String dataPoint : Conf.DataPoints) { // This is a String[] with 4 elements for this specific case
try {
if (siteNameDisplayed == false) {
String siteName = parseSiteFromPath(each._1); // method returning a parsed String
logger.info("Data for site: " + siteName);
siteSet.add(siteName);
siteNameDisplayed = true;
}
if ( dateDisplayed == false ) {
logger.info("Month: " + formatDate(startTime)); // SimpleFormatDate("yyyy-MM-dd")
dateDisplayed = true;
}
DataFrame lastMonth = metric.filter("event.eventId=\"" + dataPoint + "\"").filter("creationDate >= " + startTime.getTime()).filter("creationDate < " + endTime.getTime()).select("event.data.UserId").distinct();
logger.info("Distinct for last month for " + dataPoint + ": " + lastMonth.count());
totalUsersForThisMonth.addAll(lastMonth.collectAsList());
} catch (Exception e) {
// data does not fit the expected model so there is nothing to print
}
}
logger.info("Total Unique for the month: " + totalStudentForThisMonth.size());
startTime = DateUtils.addMonths(startTime, 1);
dateDisplayed = false;
} while ( startTime.getTime() < commonTmsMetric.monthDate(yesterday).getTime());
// reset startTime for the next site
startTime = commonTmsMetric.StartofYear(new Date(yesterday.getTime()));
}
}
В этом коде есть несколько вещей, которые неэффективны, но когда я смотрю на журналы, это добавляет всего несколько секунд ко всей обработке.
Должно быть, я упускаю что-то большое.
Я запустил это с 2 исполнителями и 1 исполнителем, и разница составляет 20 секунд на 5 минут.
Это работает с Java 1.7 и Spark 1.4.1 на Hadoop 2.5.0.
Благодарю вас!