Spark: Manera eficiente de probar si un RDD está vacío
No hay un método isEmpty
en los RDD, así que ¿cuál es la forma más eficiente de probar si un RDD está vacío?
2 answers
RDD.isEmpty()
será parte de Spark 1.3.0.
Basado en sugerencias en este hilo de correo apache y más tarde algunos comentarios a esta respuesta, he hecho algunos pequeños experimentos locales. El mejor método es usar take(1).length==0
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.take(1).length == 0
}
Debe ejecutarse en O(1)
excepto cuando el RDD está vacío, en cuyo caso es lineal en el número de particiones.
Gracias a Josh Rosen y Nick Chammas por señalarme esto.
Nota: Esto falla si el RDD es de tipo RDD[Nothing]
por ejemplo, isEmpty(sc.parallelize(Seq()))
, pero esto probablemente no es un problema en la vida real. isEmpty(sc.parallelize(Seq[Any]()))
funciona bien.
Ediciones:
-
Edit 1: Added
take(1)==0
method, thanks to comments.
Mi sugerencia original: Use mapPartitions
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
}
Debe escalar en el número de particiones y no es tan limpio como take(1)
. Sin embargo, es robusto para RDD de tipo RDD[Nothing]
.
Experimentos:
Utilicé este código para el cronometraje.
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
En mi máquina local con 3 núcleos de trabajo obtuve estos resultados
Time: 21 Result: false
Time: 75 Result: false
Time: 8664 Result: false
Time: 18266 Result: false
Time: 23836 Result: false
Time: 113 Result: false
Time: 101 Result: false
Time: 68 Result: false
Time: 221 Result: false
Time: 46 Result: false
Time: 79 Result: true
Time: 93 Result: true
Time: 79 Result: true
Time: 100 Result: true
Time: 64 Result: true
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-06 13:26:05
A partir de Spark 1.3 el isEmpty()
es parte de la api RDD. Una corrección que causaba que isEmpty
fallara se solucionó más tarde en Spark 1.4.
Para los DataFrames puedes hacer:
val df: DataFrame = ...
df.rdd.isEmpty()
Aquí está pegar el código justo fuera de la implementación RDD (a partir de 1.4.1).
/**
* @note due to complications in the internal implementation, this method will raise an
* exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
* because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
* (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-12-08 21:53:31