¿Cómo divido un RDD en dos o más RDD?


Estoy buscando una manera de dividir un RDD en dos o más RDD. Lo más cercano que he visto es Scala Spark: ¿Dividir la colección en varios RDD? que sigue siendo un solo RDD.

Si estás familiarizado con SAS, algo como esto:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

Que resultó en dos conjuntos de datos distintos. Tendría que ser persistido inmediatamente para obtener los resultados que pretendo...

Author: Peter Mortensen, 2015-10-06

4 answers

No es posible producir varios RDD a partir de una sola transformación*. Si desea dividir un RDD, debe aplicar un filter para cada condición de división. Por ejemplo:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

Si solo tiene una condición binaria y la computación es costosa, puede preferir algo como esto: {[15]]}

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

Significa solo un solo cálculo de predicados, pero requiere una transferencia adicional sobre todos los datos.

Es importante tener en cuenta que siempre y cuando un RDD de entrada se almacene correctamente en caché y no no hay suposiciones adicionales con respecto a la distribución de datos no hay diferencia significativa cuando se trata de la complejidad del tiempo entre el filtro repetido y el bucle for con if-else anidado.

Con N elementos y M condiciones el número de operaciones que tiene que realizar es claramente proporcional a N veces M. En el caso del bucle for, debe estar más cerca de (N + MN) / 2 y el filtro repetido es exactamente NM, pero al final del día no es nada más que O(NM). Puedes ver mi discusión* * con Jason Lenderman para leer sobre algunos pros y contras.

En el nivel muy alto usted debe considerar dos cosas: {[15]]}

  1. Las transformaciones de chispa son perezosas, hasta que ejecuta una acción, su RDD no se materializa

    ¿por Qué importa? Volviendo a mi ejemplo:

    rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    

    Si después decido que solo necesito rdd_odd entonces no hay razón para materializarme rdd_even.

    Si echa un vistazo a su ejemplo de SAS para calcular work.split2 necesita materializar ambos datos de entrada y work.split1.

  2. Los RDD proporcionan una API declarativa. Cuando se utiliza filter o map depende completamente de Spark engine cómo se realiza esta operación. Siempre y cuando las funciones pasadas a las transformaciones estén libres de efectos secundarios, crea múltiples posibilidades para optimizar toda una tubería.

A fin de cuentas, este caso no es lo suficientemente especial como para justificar su propia transformación.

Este mapa con patrón de filtro se utiliza realmente en un núcleo Chispa. Ver mi respuesta a Cómo funciona Sparks RDD.randomSplit realmente divide el RDD y una parte relevante del método randomSplit.

Si el único objetivo es lograr una división en la entrada, es posible usar la cláusula partitionBy para DataFrameWriter qué formato de salida de texto:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

* Solo hay 3 tipos básicos de transformaciones en Spark:

  • RDD [T] = > RDD [T]
  • RDD [T] = > RDD [U]
  • (RDD[T], RDD[U]) => RDD[W]

Donde T, U, W pueden ser tipos atómicos o productos / tuplas (K, V). Cualquier otra operación tiene que ser expresada usando alguna combinación de lo anterior. Puede consultar el documento RDD original para más detalles.

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

* * * Véase también Scala Spark: ¿Dividir la colección en varios RDD?

 49
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:34:26

Como otros carteles mencionados anteriormente, no hay una sola transformación RDD nativa que divida RDDs, pero aquí hay algunas operaciones "multiplex" que pueden emular eficientemente una amplia variedad de "división" en RDDs, sin leer varias veces:

Http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

Algunos métodos específicos al azar división:

Http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

Los métodos están disponibles en el proyecto silex de código abierto:

Https://github.com/willb/silex

Una entrada de blog explicando cómo funcionan:

Http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/

def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}

def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}

Como se mencionó en otra parte, estos métodos implican una compensación de la memoria para la velocidad, ya que operan mediante el cálculo de resultados de partición completa " con impaciencia "en lugar de" perezosamente."Por lo tanto, es posible que estos métodos se encuentren con problemas de memoria en particiones grandes, donde las transformaciones perezosas más tradicionales no lo harán.

 4
Author: eje,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-03-03 21:49:05

Si divide un RDD usando la llamada a la API randomSplit, obtendrá una matriz de RDDs.

Si desea que se devuelvan 5 RDD, pase 5 valores de peso.

Por ejemplo

val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)

splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
 0
Author: Ewan Leith,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-15 14:27:10

Una forma es usar un particionador personalizado para particionar los datos dependiendo de la condición del filtro. Esto se puede lograr extendiendo Partitioner e implementando algo similar a RangePartitioner.

Las particiones de mapa se pueden usar para construir múltiples RDDs a partir del RDD particionado sin leer todos los datos.

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext
      }
    }

    override def next():Int = iter.next()
  }

Solo tenga en cuenta que el número de particiones en los RDD filtrados será el mismo que el número en el RDD particionado, por lo que se debe usar una fusión para reducir esto hacia abajo y eliminar las particiones vacías.

 -2
Author: Jem Tucker,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-19 20:33:03