Как распараллелить / распределить запросы / счетчики в Spark DataFrame?

У меня есть DataFrame, к которому я должен применить серию фильтровальных запросов. Например, я загружаю свой DataFrame следующим образом.

val df = spark.read.parquet("hdfs://box/some-parquet")

Затем у меня есть куча "произвольных" фильтров, как показано ниже.

  • C0 = "истина" и C1 = "ложь"
  • C0 = «ложь» и C3 = «истина»
  • и так далее...

Обычно я получаю эти фильтры динамически, используя метод util.

val filters: List[String] = getFilters()

Все, что я делаю, это применяю эти фильтры к DataFrame, чтобы получить счет. Например.

val counts = filters.map(filter => {
 df.where(filter).count
})

Я заметил, что это НЕ параллельная / распределенная операция при сопоставлении фильтров. Если я вставлю фильтры в RDD / DataFrame, этот подход тоже не будет работать, потому что тогда я буду выполнять операции с вложенными фреймами данных (которые, как я читал в SO, не разрешены в Spark). Примерно следующее дает исключение NullPointerException (NPE).

val df = spark.read.parquet("hdfs://box/some-parquet")
val filterRDD = spark.sparkContext.parallelize(List("C0='false'", "C1='true'"))
val counts = filterRDD.map(df.filter(_).count).collect
Caused by: java.lang.NullPointerException
  at org.apache.spark.sql.Dataset.filter(Dataset.scala:1127)
  at $anonfun$1.apply(:27)
  at $anonfun$1.apply(:27)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
  at scala.collection.AbstractIterator.to(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
  at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Есть ли способ распараллелить / распределить фильтры подсчета на DataFrame в Spark? Кстати, я использую Spark v2.0.2.


person Jane Wayne    schedule 20.12.2016    source источник
comment
Предполагая, что вы хотите достичь одного прохода по входным данным (в противном случае, можно не ожидать никакого выигрыша от этого), я бы переделал функции фильтрации для UDF, которые возвращают 1 (соответствие фильтра) или 0 (без фильтра match), добавьте 1 столбец с помощью UDF в фрейм данных и выполните groupBy / count для добавленных столбцов, что приведет к фрейму данных из 1 строки, содержащему все счетчики.   -  person GPI    schedule 20.12.2016
comment
не могли бы вы показать пример?   -  person Jane Wayne    schedule 20.12.2016


Ответы (1)


Таким образом, единственный ожидаемый выигрыш (который может быть очень существенным) - это однократная передача входных данных.

Я бы так сделал (программное решение, но возможен эквивалентный SQL):

  1. Преобразуйте свои фильтры в UDF, которые возвращают 1 или 0
  2. Добавьте по одному столбцу для каждой из этих UDFS
  3. Группируйте / суммируйте свои данные.

Пример сеанса искры выглядит так:

scala> val data = spark.createDataFrame(Seq("A", "BB", "CCC").map(Tuple1.apply)).withColumnRenamed("_1", "input")

data: org.apache.spark.sql.DataFrame = [input: string]

scala> data.show
+-----+
|input|
+-----+
|    A|
|   BB|
|  CCC|
+-----+

scala> val containsBFilter = udf((input: String) => if(input.contains("B")) 1 else 0)
containsBFilter: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

scala> val lengthFilter = udf((input: String) => if (input.length < 3) 1 else 0)
lengthFilter: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

scala> data.withColumn("inputLength", lengthFilter($"input")).withColumn("containsB", containsBFilter($"input")).select(sum($"inputLength"), sum($"containsB")).show

+----------------+--------------+
|sum(inputLength)|sum(containsB)|
+----------------+--------------+
|               2|             1|
+----------------+--------------+
person GPI    schedule 20.12.2016
comment
это очень умно. Мне это нравится. Как вы думаете, использование класса параллельной коллекции Scala также может помочь в распараллеливании? с вашим подходом, как вы заявили, это один проход по данным, но UDF будут линейно расти с количеством фильтров. но я думаю, это нормально, если производительность не снижается. - person Jane Wayne; 20.12.2016
comment
Я предполагаю, что Spark достаточно умен, чтобы точно обрабатывать дополнительные столбцы (например, вычислять их, передавая данные только один раз), потому что они не имеют зависимости. Это действительно занимает больше места, но мы говорим долго, это очень эффективно. Использование параллельных коллекций противоречит цели Spark: разделы Spark - это единица параллелизма (1 ядро ​​исполнителя = 1 раздел), не подключайте свои собственные (вы будете бороться только за процессоры с разделами Spark), просто правильно разделите. - person GPI; 20.12.2016
comment
Я возился с этим подходом несколько дней. Не думаю, что в моей ситуации это сработает. Во-первых, что касается параллельного сбора, этот подход будет распараллеливаться только в моей программе драйвера, а не в кластере, что я не планировал. Во-вторых, я должен создавать эти UDF динамически (я задал еще один вопрос SO). Но даже если я решил проблему создания UDF динамически, операция df = df.withColumn("someFilter", someUdf(...)) будет недопустимо долгой. У меня может быть любое количество динамических UDF (фильтров), и это добавит столько столбцов. - person Jane Wayne; 22.12.2016
comment
Я действительно не понимаю, что происходит. Добавление столбца должно происходить практически мгновенно, это всего лишь обновление логического плана, никаких вычислений производить не следует. И я не понимаю, какова ваша параллельная коллекция и как она ко всему этому вписывается. Хотя, может быть, это для другого вопроса. Что касается преобразования фильтра в UDF 1/0, то это, вероятно, можно было бы сделать в общем случае. - person GPI; 22.12.2016