В RDD нет метода isEmpty
, так каков наиболее эффективный способ тестирования, если RDD пуст?
Spark: эффективный способ проверить, пуст ли RDD
Ответы (2)
RDD.isEmpty()
будет частью Spark 1.3.0.
На основе предложений в эта почтовая ветка apache и позже некоторые комментарии к этому ответу, я провел несколько небольших локальных экспериментов. Лучший метод — использовать take(1).length==0
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.take(1).length == 0
}
Он должен работать в O(1)
, за исключением случаев, когда RDD пуст, и в этом случае он линейен по количеству разделов.
Спасибо Джошу Розену и Нику Чаммасу, которые указали мне на это.
Примечание. Это не удается, если RDD имеет тип RDD[Nothing]
, например. isEmpty(sc.parallelize(Seq()))
, но в реальной жизни это, скорее всего, не проблема. isEmpty(sc.parallelize(Seq[Any]()))
работает нормально.
Редактирует:
- Редактировать 1: Добавлен метод
take(1)==0
, благодаря комментариям.
Мое первоначальное предложение: используйте mapPartitions
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
}
Он должен масштабироваться по количеству разделов и не так чист, как take(1)
. Однако он устойчив к RDD типа RDD[Nothing]
.
Эксперименты:
Я использовал этот код для таймингов.
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
На моей локальной машине с 3 рабочими ядрами я получил следующие результаты
Time: 21 Result: false
Time: 75 Result: false
Time: 8664 Result: false
Time: 18266 Result: false
Time: 23836 Result: false
Time: 113 Result: false
Time: 101 Result: false
Time: 68 Result: false
Time: 221 Result: false
Time: 46 Result: false
Time: 79 Result: true
Time: 93 Result: true
Time: 79 Result: true
Time: 100 Result: true
Time: 64 Result: true
isEmpty
в RDD: github.com/apache/spark/ тянуть/4074
- person Josh Rosen; 11.02.2015
.take(1)
на RDD и посмотреть, пустой ли результат?
- person Nick Chammas; 11.02.2015
RDD[Nothing]
. Однако это крайний случай, поскольку RDD[Nothing]
практически бесполезен. На самом деле у нас идет обсуждение второго запроса на включение.
- person Tobber; 11.02.2015
Начиная с Spark 1.3 isEmpty()
является частью RDD API. Исправление, приводившее к сбою isEmpty
, позже было исправлено в Spark 1.4.
Для DataFrames вы можете сделать:
val df: DataFrame = ...
df.rdd.isEmpty()
Вот вставка кода сразу из реализации RDD (начиная с 1.4.1).
/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}