Spark: Manera eficiente de probar si un RDD está vacío


No hay un método isEmpty en los RDD, así que ¿cuál es la forma más eficiente de probar si un RDD está vacío?

Author: marios, 2015-02-11

2 answers

RDD.isEmpty() será parte de Spark 1.3.0.

Basado en sugerencias en este hilo de correo apache y más tarde algunos comentarios a esta respuesta, he hecho algunos pequeños experimentos locales. El mejor método es usar take(1).length==0.

def isEmpty[T](rdd : RDD[T]) = {
  rdd.take(1).length == 0 
}

Debe ejecutarse en O(1) excepto cuando el RDD está vacío, en cuyo caso es lineal en el número de particiones.

Gracias a Josh Rosen y Nick Chammas por señalarme esto.

Nota: Esto falla si el RDD es de tipo RDD[Nothing] por ejemplo, isEmpty(sc.parallelize(Seq())), pero esto probablemente no es un problema en la vida real. isEmpty(sc.parallelize(Seq[Any]())) funciona bien.


Ediciones:

  • Edit 1: Added take(1)==0 method, thanks to comments.

Mi sugerencia original: Use mapPartitions.

def isEmpty[T](rdd : RDD[T]) = {
  rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_) 
}

Debe escalar en el número de particiones y no es tan limpio como take(1). Sin embargo, es robusto para RDD de tipo RDD[Nothing].


Experimentos:

Utilicé este código para el cronometraje.

def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
  val start = System.currentTimeMillis()
  val rdd = sc.parallelize(1L to n, numSlices = 100)
  val result = f(rdd)
  printf("Time: " + (System.currentTimeMillis() - start) + "   Result: " + result)
}

time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)

time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)

time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)

En mi máquina local con 3 núcleos de trabajo obtuve estos resultados

Time:    21   Result: false
Time:    75   Result: false
Time:  8664   Result: false
Time: 18266   Result: false
Time: 23836   Result: false

Time:   113   Result: false
Time:   101   Result: false
Time:    68   Result: false
Time:   221   Result: false
Time:    46   Result: false

Time:    79   Result: true
Time:    93   Result: true
Time:    79   Result: true
Time:   100   Result: true
Time:    64   Result: true
 26
Author: Tobber,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-06 13:26:05

A partir de Spark 1.3 el isEmpty() es parte de la api RDD. Una corrección que causaba que isEmpty fallara se solucionó más tarde en Spark 1.4.

Para los DataFrames puedes hacer:

val df: DataFrame = ...
df.rdd.isEmpty()

Aquí está pegar el código justo fuera de la implementación RDD (a partir de 1.4.1).

  /**
   * @note due to complications in the internal implementation, this method will raise an
   * exception if called on an RDD of `Nothing` or `Null`. This may be come up in practice
   * because, for example, the type of `parallelize(Seq())` is `RDD[Nothing]`.
   * (`parallelize(Seq())` should be avoided anyway in favor of `parallelize(Seq[T]())`.)
   * @return true if and only if the RDD contains no elements at all. Note that an RDD
   *         may be empty even when it has at least 1 partition.
   */
  def isEmpty(): Boolean = withScope {
    partitions.length == 0 || take(1).length == 0
  }
 3
Author: marios,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-12-08 21:53:31