reduceByKey: ¿Cómo funciona internamente?


Soy nuevo en Spark y Scala. Estaba confundido sobre la forma en que la función reduceByKey funciona en Spark. Supongamos que tenemos el siguiente código:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

La función map es clara: s es la clave y apunta a la línea de data.txt y 1 es el valor.

Sin embargo, no entendí cómo funciona la reduceByKey internamente? ¿"a" apunta a la clave? Alternativamente, ¿"a" apunta a "s"? Entonces, ¿qué representa a + b? ¿cómo se llenan?

Author: Atticus Liu, 2015-05-10

3 answers

Vamos a dividirlo en métodos y tipos discretos. Que por lo general expone las complejidades para los nuevos desarrolladores:

pairs.reduceByKey((a, b) => a + b)

Se convierte en

pairs.reduceByKey((a: Int, b: Int) => a + b)

Y renombrar las variables lo hace un poco más explícito

pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)

Por lo tanto, ahora podemos ver que simplemente estamos tomando un valor acumulado para la clave dada y sumándolo con el siguiente valor de esa clave. AHORA, vamos a romper más para que podamos entender la parte clave. Por lo tanto, vamos a visualizar el método más como esto:

pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
  //Turn the accumulated value into a true key->value mapping
  val accumAsMap = accumulatedValue.toMap   
  //Try to get the key's current value if we've already encountered it
  accumAsMap.get(currentValue._1) match { 
    //If we have encountered it, then add the new value to the existing value and overwrite the old
    case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
    //If we have NOT encountered it, then simply add it to the list
    case None => currentValue :: accumulatedValue 
  }
})

Por lo tanto, se puede ver que el reducir ByKey toma el patrón de encontrar la clave y el seguimiento de manera que usted no tiene que preocuparse por la gestión de esa parte.

Más profundo, más verdadero si quieres

Dicho todo esto, es una versión simplificada de lo que sucede, ya que hay algunas optimizaciones que se hacen aquí. Esta operación es asociativa, por lo que el motor spark realizará estas reducciones localmente primero (a menudo denominado reducción del lado del mapa) y luego, una vez más en el conductor. Esto ahorra tráfico de red; en lugar de enviar todos los datos y realizar la operación, puede reducirlo lo más pequeño posible y luego enviar esa reducción por cable.

 58
Author: Justin Pihony,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-04-30 19:43:05

Un requisito para la función reduceByKey es que debe ser asociativa. Para construir alguna intuición sobre cómo funciona reduceByKey, veamos primero cómo una función asociativa asociativa nos ayuda en un cálculo paralelo:

función asociativa en acción

Como podemos ver, podemos romper una colección original en pedazos y aplicando la función asociativa, podemos acumular un total. El caso secuencial es trivial, estamos acostumbrados: 1+2+3+4+5+6+7+8+9+10.

La asociatividad nos permite usar lo mismo función en secuencia y en paralelo. reduceByKey usa esa propiedad para calcular un resultado de un RDD, que es una colección distribuida que consiste en particiones.

Considere el siguiente ejemplo:

// collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
rdd.reduceByKey(_ + _)
rdd.collect()
> Array[(String, Int)] = Array((key,210))

En spark, los datos se distribuyen en particiones. Para la siguiente ilustración, (4) las particiones están a la izquierda, encerradas en líneas delgadas. Primero, aplicamos la función localmente a cada partición, secuencialmente en la partición, pero ejecutamos las 4 particiones en paralelo. Entonces, el resultado de cada cálculo local se agrega aplicando la misma función nuevamente y finalmente se obtiene un resultado.

introduzca la descripción de la imagen aquí

reduceByKey es una especialización de aggregateByKey aggregateByKey toma 2 funciones: una que se aplica a cada partición (secuencialmente) y otra que se aplica entre los resultados de cada partición (en paralelo). reduceByKey utiliza la misma función asociativa en ambos casos: para hacer una computación secuencial en cada partición y luego combinar esos resultados en un resultado final como tenemos ilustrado aquí.

 26
Author: maasg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-05-10 13:06:59

En su ejemplo de

val counts = pairs.reduceByKey((a,b) => a+b)

a y b son ambos Int acumuladores para _2 de las tuplas en pairs. reduceKey tomará dos tuplas con el mismo valor s y usará sus valores _2 como a y b, produciendo un nuevo Tuple[String,Int]. Esta operación se repite hasta que solo hay una tupla para cada tecla s.

A diferencia de non - Spark (o, realmente, no paralelo) reduceByKey donde el primer elemento es siempre el acumulador y el segundo un valor, reduceByKey opera en un de manera distribuida, es decir, cada nodo reducirá su conjunto de tuplas en una colección de tuplas de clave única y luego reducirá las tuplas de múltiples nodos hasta que haya un conjunto final de de clave única de tuplas. Esto significa que a medida que los resultados de los nodos se reducen, a y b representan acumuladores ya reducidos.

 4
Author: Arne Claassen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-05-09 22:26:25