Spark especificar múltiples condiciones de columna para la unión de dataframe
Cómo dar más condiciones de columna al unir dos dataframes. Por ejemplo, quiero ejecutar lo siguiente:
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Quiero unirme solo cuando estas columnas coincidan. Pero la sintaxis anterior no es válida ya que cols solo toma una cadena. Entonces, ¿cómo consigo lo que quiero?
7 answers
Hay un Spark columna/expresión API join para tal caso:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
El operador <=>
en el ejemplo significa " Prueba de igualdad que es segura para valores nulos".
La diferencia principal con la prueba de igualdad simple (===
) es que la primera es segura de usar en caso de que una de las columnas pueda tener valores nulos.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-21 07:49:06
A partir de la versión 1.5.0 de Spark (que actualmente no se ha publicado), puede unirse en varias columnas de DataFrame. Refiérase a SPARK-7990: Agregue métodos para facilitar equi-join en múltiples claves de unión .
Python
Leads.join(
Utm_Master,
["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
"left_outer"
)
Scala
La pregunta pedía una respuesta Scala, pero no uso Scala. Esta es mi mejor suposición....
Leads.join(
Utm_Master,
Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left_outer"
)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-08-08 02:59:11
Una cosa que puede hacer es usar SQL raw:
case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
val bar = sqlContext.createDataFrame(sc.parallelize(
Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
Bar(3, 1, 2, "bar") :: Nil))
val foo = sqlContext.createDataFrame(sc.parallelize(
Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
foo.registerTempTable("foo")
bar.registerTempTable("bar")
sqlContext.sql(
"SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-21 07:59:51
En Pyspark simplemente puede especificar cada condición por separado:
val Lead_all = Leads.join(Utm_Master,
(Leaddetails.LeadSource == Utm_Master.LeadSource) &
(Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
(Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
(Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
Solo asegúrese de usar operadores y paréntesis correctamente.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-03 11:57:43
Scala:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
Para hacerlo insensible a mayúsculas y minúsculas ,
import org.apache.spark.sql.functions.{lower, upper}
Entonces simplemente use lower(value)
en la condición del método join.
Eg: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-21 08:06:52
Las opciones ===
me dan columnas duplicadas. Así que uso Seq
en su lugar.
val Lead_all = Leads.join(Utm_Master,
Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
Por supuesto, esto solo funciona cuando los nombres de las columnas de unión son los mismos.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-04-13 17:09:09
Spark SQL soporta join en tupla de columnas cuando está entre paréntesis, como
... WHERE (list_of_columns1) = (list_of_columns2)
Que es una forma más corta que especificar expresiones iguales ( = ) para cada par de columnas combinadas por un conjunto de "Y"s.
Por ejemplo:
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
)
En lugar de
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
)
Que también es menos legible, especialmente cuando la lista de columnas es grande y desea tratar con NULLs fácilmente.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-06 04:28:55