Spark Dataframe distinguir columnas con nombre duplicado


Así que como sé en Spark Dataframe, que para varias columnas puede tener el mismo nombre como se muestra en la siguiente instantánea de dataframe:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

El resultado anterior se crea al unirse con un dataframe a sí mismo, puede ver que hay 4 columnas con ambos a y f.

El problema es que cuando intento hacer más cálculo con la columna a, no puedo encontrar una manera de seleccionar el a, he intentado df[0] y df.select('a'), ambos me devolvieron el error debajo mesaage:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

¿Hay de alguna manera en Spark API que pueda distinguir las columnas de los nombres duplicados de nuevo? ¿o tal vez alguna forma de dejarme cambiar los nombres de las columnas?

Author: resec, 2015-11-18

7 answers

Le recomendaría que cambie los nombres de las columnas para su join

df1.select('a as "df1_a", 'f as "df1_f")
   .join(df2.select('a as "df2_a", 'f as "df2_f"), 'df1_a === 'df2_a)

El DataFrame resultante tendrá schema

(df1_a, df1_f, df2_a, df2_f)
 26
Author: Glennie Helles Sindholt,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-11-18 11:33:08

Comencemos con algunos datos:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

Hay algunas maneras de abordar este problema. En primer lugar, puede hacer referencia inequívoca a las columnas de la tabla secundaria utilizando las columnas principales:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

También puede usar alias de tabla:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Finalmente puede cambiar el nombre de las columnas mediante programación:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
 53
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-11-18 11:52:26

Después de indagar en la API de Spark, descubrí que primero puedo usar alias para crear un alias para el dataframe original y luego usar withColumnRename para cambiar el nombre manualmente de cada columna en el alias, al final para hacer el join sin causar la duplicación del nombre de la columna.

Se pueden consultar más detalles a continuación Spark Dataframe API :

Pyspark.SQL.DataFrame.alias

Pyspark.SQL.DataFrame.withColumnRenamed

Sin embargo, creo que esto es sólo un problema solución, y preguntándose si hay alguna mejor manera para mi pregunta.

 4
Author: resec,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-11-18 11:26:18

Puede usar el método def drop(col: Column) para eliminar la columna duplicada, por ejemplo:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

Cuando uno df1 con df2, el DataFrame será como a continuación:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

Ahora, podemos usar el método def drop(col: Column) para eliminar la columna duplicada ' a 'o' f', de la siguiente manera:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
 3
Author: StrongYoung,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-08-22 09:14:33

Hay una forma más sencilla que escribir alias para todas las columnas a las que te unes haciendo:

df1.join(df2,['a'])

Esto funciona si la clave con la que se está uniendo es la misma en ambas tablas.

Véase https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

 2
Author: Paul Bendevis,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-06-19 16:55:01

Así es como podemos unir dos Dataframes en los mismos nombres de columna en PySpark.

df = df1.join(df2, ['col1','col2','col3'])

Si lo hace printSchema() después de esto, puede ver que se han eliminado las columnas duplicadas.

 1
Author: Nikhil R,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-07-26 12:26:52

Supongamos que los DataFrames que desea unir son df1 y df2, y los está uniendo en la columna 'a', entonces tiene 2 métodos

Método 1

Df1.join (df2,'a','left_outer')

Este es un método impresionante y es muy recomendable.

Método 2

Df1.unirse (df2, df1.a = = df2.a, 'left_outer').drop (df2.a)

 0
Author: typhoonbxq,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-04-27 02:26:14