Spark: эффективный способ проверить, пуст ли RDD

В RDD нет метода isEmpty, так каков наиболее эффективный способ тестирования, если RDD пуст?


person Tobber    schedule 11.02.2015    source источник


Ответы (2)


RDD.isEmpty() будет частью Spark 1.3.0.

На основе предложений в эта почтовая ветка apache и позже некоторые комментарии к этому ответу, я провел несколько небольших локальных экспериментов. Лучший метод — использовать take(1).length==0.

def isEmpty[T](rdd : RDD[T]) = {
  rdd.take(1).length == 0 
}

Он должен работать в O(1), за исключением случаев, когда RDD пуст, и в этом случае он линейен по количеству разделов.

Спасибо Джошу Розену и Нику Чаммасу, которые указали мне на это.

Примечание. Это не удается, если RDD имеет тип RDD[Nothing], например. isEmpty(sc.parallelize(Seq())), но в реальной жизни это, скорее всего, не проблема. isEmpty(sc.parallelize(Seq[Any]())) работает нормально.


Редактирует:

  • Редактировать 1: Добавлен метод take(1)==0, благодаря комментариям.

Мое первоначальное предложение: используйте mapPartitions.

def isEmpty[T](rdd : RDD[T]) = {
  rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_) 
}

Он должен масштабироваться по количеству разделов и не так чист, как take(1). Однако он устойчив к RDD типа RDD[Nothing].


Эксперименты:

Я использовал этот код для таймингов.

def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
  val start = System.currentTimeMillis()
  val rdd = sc.parallelize(1L to n, numSlices = 100)
  val result = f(rdd)
  printf("Time: " + (System.currentTimeMillis() - start) + "   Result: " + result)
}

time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)

time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)

time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)

На моей локальной машине с 3 рабочими ядрами я получил следующие результаты

Time:    21   Result: false
Time:    75   Result: false
Time:  8664   Result: false
Time: 18266   Result: false
Time: 23836   Result: false

Time:   113   Result: false
Time:   101   Result: false
Time:    68   Result: false
Time:   221   Result: false
Time:    46   Result: false

Time:    79   Result: true
Time:    93   Result: true
Time:    79   Result: true
Time:   100   Result: true
Time:    64   Result: true
person Tobber    schedule 11.02.2015
comment
Spark недавно объединил запрос на вытягивание, чтобы добавить метод isEmpty в RDD: github.com/apache/spark/ тянуть/4074 - person Josh Rosen; 11.02.2015
comment
Хорошие новости. На самом деле запрос на вытягивание содержал ошибку. Я отправил исправление на github.com/apache/spark/pull/4534. - person Tobber; 11.02.2015
comment
Тоббер, а не проще ли и так же быстро сделать .take(1) на RDD и посмотреть, пустой ли результат? - person Nick Chammas; 11.02.2015
comment
@НикЧаммас. Короче да. Однако есть ошибка, когда ваш RDD имеет тип RDD[Nothing]. Однако это крайний случай, поскольку RDD[Nothing] практически бесполезен. На самом деле у нас идет обсуждение второго запроса на включение. - person Tobber; 11.02.2015
comment
@Tobber, что должно быть эквивалентно в Java? - person ben; 12.09.2019

Начиная с Spark 1.3 isEmpty() является частью RDD API. Исправление, приводившее к сбою isEmpty, позже было исправлено в Spark 1.4.

Для DataFrames вы можете сделать:

val df: DataFrame = ...
df.rdd.isEmpty()

Вот вставка кода сразу из реализации RDD (начиная с 1.4.1).

  /**
   * @note due to complications in the internal implementation, this method will raise an
   * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
   * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
   * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
   * @return true if and only if the RDD contains no elements at all. Note that an RDD
   *         may be empty even when it has at least 1 partition.
   */
  def isEmpty(): Boolean = withScope {
    partitions.length == 0 || take(1).length == 0
  }
person marios    schedule 08.12.2015