Как запустить два запроса SparkSql параллельно в Apache Spark

Во-первых, позвольте мне написать часть кода, которую я хочу выполнить, в файле .scala на Spark.

Это мой исходный файл. Имеет структурированные данные с четырьмя полями.

val inputFile = sc.textFile("hdfs://Hadoop1:9000/user/hduser/test.csv")

Я объявил класс case для хранения данных из файла в таблице с четырьмя столбцами

case class Table1(srcIp: String, destIp: String, srcPrt: Int, destPrt: Int)

val inputValue = inputFile.map(_.split(",")).map(p => Table1(p(0),p(1),p(2).trim.toInt,p(3).trim.toInt)).toDF()

inputValue.registerTempTable("inputValue")

Теперь, скажем, я хочу выполнить следующие два запроса. Как я могу запускать эти запросы параллельно, поскольку они независимы друг от друга. Я чувствую, что если бы я мог запускать их параллельно, это могло бы сократить время выполнения. Прямо сейчас они выполняются серийно.

val primaryDestValues = sqlContext.sql("SELECT distinct destIp FROM inputValue")
primaryDestValues.registerTempTable("primaryDestValues")
val primarySrcValues = sqlContext.sql("SELECT distinct srcIp FROM inputValue")
primarySrcValues.registerTempTable("primarySrcValues")

primaryDestValues.join(primarySrcValues, $"destIp" === $"srcIp").select($"destIp",$"srcIp").show(

person Ahmad    schedule 17.03.2016    source источник


Ответы (4)


Может быть, вы можете посмотреть в направлении Futures / Promises. В SparkContext submitJob есть метод, который возвращает вам будущее с результатами. Так что вы можете уволить две работы, а затем собрать результаты по фьючерсам.

Еще не пробовал этот способ. Просто предположение.

person Zeke Fast    schedule 18.03.2016
comment
Я пробовал, но не смог получить возвращаемое значение из будущего. Постараюсь больше работать, и если Бог даст добро, удачно откликнется. - person Ahmad; 18.03.2016
comment
@Ahmad После увольнения двух или более заданий и получения обратно Futures, вы, вероятно, захотите получить агрегированное будущее или сопоставить результаты этих заданий. Самый правильный способ сделать это - определить обработчики в результирующей цепочке Futures для случаев успеха и неудачи, или вы можете просто сопоставить результаты или использовать для. Пожалуйста, проверьте эту ветку для получения дополнительных сведений о присоединении к фьючерсам. - person Zeke Fast; 20.03.2016

Понятия не имею, почему вы хотите использовать sqlContext в первую очередь, и не усложняете задачу.

val inputValue = inputFile.map(_.split(",")).map(p => (p(0),p(1),p(2).trim.toInt,p(3).trim.toInt))

Предполагая, что p (0) = destIp, p (1) = srcIp

val joinedValue = inputValue.map{case(destIp, srcIp, x, y) => (destIp, (x, y))}
                  .join(inputFile.map{case(destIp, srcIp, x, y) => (srcIp, (x, y))})
                  .map{case(ip, (x1, y1), (x2, y2)) => (ip, destX, destY, srcX, srcY)}

Теперь он будет распараллелен, и вы даже сможете контролировать количество разделов, которое хотите, используя colasce.

person Abhishek Anand    schedule 18.03.2016
comment
Я использую контекст sql, потому что у меня есть много сложных запросов для вычисления медианы, среднего и т. Д. Итак, я думаю, что это легче достичь с помощью SQL. Два запроса в вопросе являются частью многих запросов. - person Ahmad; 18.03.2016
comment
Я не буду предлагать этого, если только это не будет абсолютно необходимо. Идея проста: когда вы используете SQL, вы добавляете дополнительный слой для обработки искры, что накладно. В любом случае, перейдем к вашему вопросу. Spark может выполнять только одну задачу за раз, и та, которая вызывается вместе со всеми связанными операциями, когда вы достигаете триггерной функции в вашем случае, показывают. Я думаю, что основная причина того, что ваши запросы не выполняются параллельно, заключается в том, что sqlContext является потокобезопасным. - person Abhishek Anand; 19.03.2016

Вы можете пропустить два DISTINCT и сделать один в конце:

inputValue.select($"srcIp").join(
  inputValue.select($"destIp"), 
  $"srcIp" === $"destIp"
).distinct().show
person David Griffin    schedule 17.03.2016
comment
Спасибо, что ответили, но это был всего лишь пример. Мне нужно знать метод параллельного выполнения запросов. - person Ahmad; 18.03.2016
comment
Вы действительно не можете. Вы можете присоединиться к ним, вот и все. Это ваш вариант. Так или иначе, соединение. - person David Griffin; 18.03.2016
comment
Я читал о методе Future, но не понял его должным образом. Думаю, с его помощью можно запускать запросы параллельно. - person Ahmad; 18.03.2016

Хороший вопрос. Это можно выполнить параллельно, используя par в массиве. Для этого вы соответствующим образом настроили свой код.

Объявите массив с двумя элементами в нем (вы можете назвать это по своему желанию). Напишите свой код внутри каждого оператора case, который необходимо выполнять параллельно.

Array("destIp","srcIp").par.foreach { i => 
{
    i match {
      case "destIp" => {
        val primaryDestValues = sqlContext.sql("SELECT distinct destIp FROM inputValue")
        primaryDestValues.registerTempTable("primaryDestValues")
      }
      case "srcIp" => {
        val primarySrcValues = sqlContext.sql("SELECT distinct srcIp FROM inputValue")
        primarySrcValues.registerTempTable("primarySrcValues")
      }}}
}

После завершения выполнения обоих операторов case будет выполнен приведенный ниже код.

primaryDestValues.join(primarySrcValues, $"destIp" === $"srcIp").select($"destIp",$"srcIp").show()

Примечание. Если вы удалите par из кода, он будет выполняться последовательно

Другой вариант - создать внутри кода еще одну сессию искр и выполнить sql, используя эту переменную искровой сессии. Но это немного рискованно и должно использоваться очень осторожно.

person Sarath KS    schedule 30.09.2018