Как я могу распараллелить цикл for в spark с помощью scala?

Например, у нас есть файл паркета с ценой закрытия 2000 символов акций за последние 3 года, и мы хотим рассчитать 5-дневную скользящую среднюю для каждого символа.

Итак, я создаю искру SQLContext, а затем

val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()

Чтобы получить список символов,

val symbols = marketData.select("SYMBOL").distinct().collect()

и вот цикл for:

for (symbol <- symbols) {
  marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}

Очевидно, что выполнение цикла for в spark выполняется медленно, и save() для каждого небольшого результата также замедляет процесс (я пытался определить var result вне цикла for и объединить все выходные данные, чтобы выполнить операцию ввода-вывода вместе, но я получил stackoverflow исключение), так как же распараллелить цикл for и оптимизировать операцию ввода-вывода?


person Rongjie Zhang    schedule 03.05.2016    source источник


Ответы (2)


Программа, которую вы пишете, запускается в искровом узле драйвера ("главного"). Выражения в этой программе можно распараллелить, только если вы работаете с параллельными структурами (RDD).

Попробуй это:

marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg)  }.foreach{ case (symbol,averages) => averages.save() }

где symbolize берет строку символа x день и возвращает кортеж (символ, день).

person Carlos López-Camey    schedule 03.05.2016
comment
Спасибо за Ваш ответ. Однако marketdata содержит все рыночные данные (2000 символов × 900 дней = 1800000 строк), и если мы sliding(5) на этом rdd без filter(symbol) получим неверный результат о скользящей средней? Я ясно выразился? - person Rongjie Zhang; 03.05.2016
comment
Спасибо за ваше терпение. И насколько я знаю, если у нас есть symbolize как { row => (row.getAs[String]("SYMBOL"), row) } и reduceByKey на rdd, который возвращает map(symbolize), нам придется reduceByKey{ case (row_x, row_y) => ...} вместо reduceByKey{ case (symbol, days) => ...}, наконец, я groupByKey() на rdd, который возвращает map(symbolize) и mapValues(x => x.sliding(5).map(makeAvg)).save() и это работает. Еще раз спасибо за вашу помощь! - person Rongjie Zhang; 03.05.2016

В первой части ответа я не согласен с Карлосом. Программа не запускается в драйвере ("мастер").

Цикл выполняется последовательно, но для каждого символа выполняется:

marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()

выполняется параллельно, поскольку markedData является фреймом данных Spark и распространяется.

person MomoAG    schedule 17.06.2016