Buscar fila máxima por grupo en Spark DataFrame


Estoy tratando de usar marcos de datos de Spark en lugar de RDDs, ya que parecen ser de más alto nivel que RDDs y tienden a producir código más legible, pero estaría más que feliz de recibir sugerencias para algo más idiomático para la tarea en cuestión.

En un clúster de 14 nodos de Google Dataproc, tengo alrededor de 6 millones de nombres que se traducen a ids por dos sistemas diferentes: sa y sb. Cada Row contiene name, id_sa y id_sb. Mi objetivo es producir un mapeo de id_sa a id_sb tal que para cada id_sa, el id_sb correspondiente es el id más frecuente entre todos los nombres adjuntos a id_sa.

Vamos a tratar de aclarar con un ejemplo. Si tengo las siguientes filas:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]

Mi objetivo es producir un mapeo de a1 a b2. De hecho, los nombres asociados a a1 son n1, n2 and n3, which map respectively to b1, b2 y b2, así que b2 es el mapeo más frecuente en los nombres asociados a a1. De la misma manera, a2 se asignará a b2. Está bien asumir que siempre habrá un ganador: no hay necesidad de romper los lazos.

Esperaba poder usar groupBy(df.id_sa) en mi dataframe, pero no se que hacer a continuación. Esperaba una agregación que pudiera producir, al final, las siguientes filas:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]

Pero tal vez estoy tratando de usar la herramienta equivocada y debería volver a usar RDDs.

Author: Quentin Pradet, 2016-02-05

2 answers

Usando join (resultará en más de una fila en el grupo en caso de empates):

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col 

cnts = df.groupBy("id_sa", "id_sb").agg(count("*").alias("cnt")).alias("cnts")
maxs = cnts.groupBy("id_sa").agg(F.max("cnt").alias("mx")).alias("maxs")

cnts.join(maxs, 
  (col("cnt") == col("mx")) & (col("cnts.id_sa") == col("maxs.id_sa"))
).select(col("cnts.id_sa"), col("cnts.id_sb"))

Usando las funciones de ventana (eliminará los lazos):

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

w = Window().partitionBy("id_sa").orderBy(col("cnt").desc())

(cnts
  .withColumn("rn", row_number().over(w))
  .where(col("rn") == 1)
  .select("id_sa", "id_sb"))

Usando struct ordenando:

from pyspark.sql.functions import struct

(cnts
  .groupBy("id_sa")
  .agg(F.max(struct(col("cnt"), col("id_sb"))).alias("max"))
  .select(col("id_sa"), col("max.id_sb")))

Ver también Cómo seleccionar la primera fila de cada grupo?

 34
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-02 12:08:53

Creo que lo que podría estar buscando son funciones de ventana: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.Window

Https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

Aquí hay un ejemplo en Scala (no tengo un Shell de Spark con Hive disponible en este momento, así que no pude probar el código, pero creo que debería funcionar):

case class MyRow(name: String, id_sa: String, id_sb: String)

val myDF = sc.parallelize(Array(
    MyRow("n1", "a1", "b1"),
    MyRow("n2", "a1", "b2"),
    MyRow("n3", "a1", "b2"),
    MyRow("n1", "a2", "b2")
)).toDF("name", "id_sa", "id_sb")

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy(myDF("id_sa")).orderBy(myDF("id_sb").desc)

myDF.withColumn("max_id_b", first(myDF("id_sb")).over(windowSpec).as("max_id_sb")).filter("id_sb = max_id_sb")

Probablemente hay más eficientes formas de lograr los mismos resultados con las funciones de ventana, pero espero que esto te apunte en la dirección correcta.

 7
Author: alghimo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-02-05 15:00:09