Например, у нас есть файл паркета с ценой закрытия 2000 символов акций за последние 3 года, и мы хотим рассчитать 5-дневную скользящую среднюю для каждого символа.
Итак, я создаю искру SQLContext, а затем
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
Чтобы получить список символов,
val symbols = marketData.select("SYMBOL").distinct().collect()
и вот цикл for:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}
Очевидно, что выполнение цикла for в spark выполняется медленно, и save()
для каждого небольшого результата также замедляет процесс (я пытался определить var result
вне цикла for и объединить все выходные данные, чтобы выполнить операцию ввода-вывода вместе, но я получил stackoverflow исключение), так как же распараллелить цикл for и оптимизировать операцию ввода-вывода?