Spark especificar múltiples condiciones de columna para la unión de dataframe


Cómo dar más condiciones de columna al unir dos dataframes. Por ejemplo, quiero ejecutar lo siguiente:

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")

Quiero unirme solo cuando estas columnas coincidan. Pero la sintaxis anterior no es válida ya que cols solo toma una cadena. Entonces, ¿cómo consigo lo que quiero?

Author: zero323, 2015-07-06

7 answers

Hay un Spark columna/expresión API join para tal caso:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

El operador <=> en el ejemplo significa " Prueba de igualdad que es segura para valores nulos".

La diferencia principal con la prueba de igualdad simple (===) es que la primera es segura de usar en caso de que una de las columnas pueda tener valores nulos.

 63
Author: rchukh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-21 07:49:06

A partir de la versión 1.5.0 de Spark (que actualmente no se ha publicado), puede unirse en varias columnas de DataFrame. Refiérase a SPARK-7990: Agregue métodos para facilitar equi-join en múltiples claves de unión .

Python

Leads.join(
    Utm_Master, 
    ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
    "left_outer"
)

Scala

La pregunta pedía una respuesta Scala, pero no uso Scala. Esta es mi mejor suposición....

Leads.join(
    Utm_Master,
    Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
    "left_outer"
)
 10
Author: dnlbrky,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-08-08 02:59:11

Una cosa que puede hacer es usar SQL raw:

case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)

val bar = sqlContext.createDataFrame(sc.parallelize(
    Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
    Bar(3, 1, 2, "bar") :: Nil))

val foo = sqlContext.createDataFrame(sc.parallelize(
    Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
    Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))

foo.registerTempTable("foo")
bar.registerTempTable("bar")

sqlContext.sql(
    "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
 6
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-21 07:59:51

En Pyspark simplemente puede especificar cada condición por separado:

val Lead_all = Leads.join(Utm_Master,  
    (Leaddetails.LeadSource == Utm_Master.LeadSource) &
    (Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
    (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
    (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))

Solo asegúrese de usar operadores y paréntesis correctamente.

 5
Author: Patricia F.,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-03 11:57:43

Scala:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

Para hacerlo insensible a mayúsculas y minúsculas ,

import org.apache.spark.sql.functions.{lower, upper}

Entonces simplemente use lower(value) en la condición del método join.

Eg: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))

 3
Author: Ani Menon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-21 08:06:52

Las opciones === me dan columnas duplicadas. Así que uso Seq en su lugar.

val Lead_all = Leads.join(Utm_Master,
    Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")

Por supuesto, esto solo funciona cuando los nombres de las columnas de unión son los mismos.

 1
Author: Climbs_lika_Spyder,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-04-13 17:09:09

Spark SQL soporta join en tupla de columnas cuando está entre paréntesis, como

... WHERE (list_of_columns1) = (list_of_columns2)

Que es una forma más corta que especificar expresiones iguales ( = ) para cada par de columnas combinadas por un conjunto de "Y"s.

Por ejemplo:

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
   )

En lugar de

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
   )

Que también es menos legible, especialmente cuando la lista de columnas es grande y desea tratar con NULLs fácilmente.

 0
Author: Tagar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-06 04:28:55