¿Cómo selecciono un rango de elementos en Spark RDD?


Me gustaría seleccionar un rango de elementos en un RDD de Spark. Por ejemplo, tengo un RDD con cien elementos, y necesito seleccionar elementos de 60 a 80. ¿Cómo hago eso?

Veo que RDD tiene un método take(i: int), que devuelve los primeros elementos i. Pero no hay un método correspondiente para tomar los últimos elementos i, o elementos i desde el medio a partir de un cierto índice.

Author: Sean Owen, 2014-07-10

4 answers

No creo que haya un método eficiente para hacer esto todavía. Pero la manera fácil es usar filter(), digamos que tienes un RDD, pairs con pares de valores clave y solo quieres elementos de 60 a 80 inclusive solo hazlo.

val 60to80 = pairs.filter {
    _ match {
        case (k,v) => k >= 60 && k <= 80
        case _ => false //incase of invalid input
    }
}

Creo que es posible que esto se pueda hacer de manera más eficiente en el futuro, utilizando sortByKey y guardando información sobre el rango de valores asignados a cada partición. Tenga en cuenta que este enfoque solo guardaría algo si estuviera planeando consultar el rango varias veces porque el tipo es obviamente caro.

Mirando la fuente spark definitivamente sería posible hacer consultas de rango eficientes usando RangePartitioner:

// An array of upper bounds for the first (partitions - 1) partitions
  private val rangeBounds: Array[K] = {

Este es un miembro privado de RangePartitioner con el conocimiento de todos los límites superiores de las particiones, sería fácil consultar solo las particiones necesarias. Parece que esto es algo que los usuarios de spark pueden ver en el futuro: SPARK-911

ACTUALIZACIÓN: Respuesta mucho mejor, basada en pull solicitud que estoy escribiendo para SPARK-911. Se ejecutará de manera eficiente si el RDD está ordenado y lo consulta varias veces.

val sorted = sc.parallelize((1 to 100).map(x => (x, x))).sortByKey().cache()
val p: RangePartitioner[Int, Int] = sorted.partitioner.get.asInstanceOf[RangePartitioner[Int, Int]];
val (lower, upper) = (10, 20)
val range = p.getPartition(lower) to p.getPartition(upper)
println(range)
val rangeFilter = (i: Int, iter: Iterator[(Int, Int)]) => {
  if (range.contains(i))
    for ((k, v) <- iter if k >= lower && k <= upper) yield (k, v)
  else
    Iterator.empty
}
for((k,v) <- sorted.mapPartitionsWithIndex(rangeFilter, preservesPartitioning = true).collect()) println(s"$k, $v")

Si tener toda la partición en memoria es aceptable, incluso podría hacer algo como esto.
val glommedAndCached = sorted.glom()cache(); glommedAndCached.map(a => a.slice(a.search(lower),a.search(upper)+1)).collect()

search no es un miembro Por cierto acabo de hacer una clase implícita que tiene una función de búsqueda binaria, no se muestra aquí

 12
Author: aaronman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-12 18:29:42

¿Qué tan grande es su conjunto de datos? Usted puede ser capaz de hacer lo que necesita con:

data.take(80).drop(59)

Esto parece ineficiente, pero para datos de tamaño pequeño a mediano, debería funcionar.

¿Es posible resolver esto de otra manera? ¿Cuál es el caso para elegir exactamente un cierto rango del medio de sus datos? ¿Te serviría mejor?

 6
Author: DPM,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-07-10 17:26:05

Los siguientes deben ser capaces de obtener el rango. Tenga en cuenta que la caché le ahorrará un poco de sobrecarga, porque internamente zipWithIndex necesita escanear la partición RDD para obtener el número de elementos en cada partición.

scala>val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3).cache
scala>val r2 = r1.zipWithIndex
scala>val r3 = r2.filter(x=> {x._2>2 && x._2 < 4}).map(x=>x._1)
scala>r3.foreach(println)
d
 4
Author: zhang zhan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-09-28 04:51:20

Para aquellos que tropiezan en esta pregunta buscando Chispa 2.respuesta compatible con x, puede usar filterByRange

 0
Author: jrook,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-11-20 05:38:54