¿Cómo agrego una nueva columna a un DataFrame de Spark (usando PySpark)?


Tengo un DataFrame de Spark (usando PySpark 1.5.1) y me gustaría agregar una nueva columna.

He intentado lo siguiente sin ningún éxito:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

También obtuvo un error usando esto:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

Entonces, ¿cómo agrego una nueva columna (basada en el vector Python) a un DataFrame existente con PySpark?

Author: user6910411, 2015-11-13

6 answers

No puede agregar una columna arbitraria a un DataFrame en Spark. Las columnas nuevas solo se pueden crear usando literales (otros tipos literales se describen en ¿Cómo agregar una columna constante en un DataFrame de Spark?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

Transformando una columna existente:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

Incluido usando join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

O generado con la función / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

En cuanto al rendimiento, las funciones integradas (pyspark.sql.functions), que se asignan a la expresión Catalyst, generalmente son preferidas sobre el usuario de Python funciones definidas.

Si desea agregar contenido de un RDD arbitrario como columna, puede

 131
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 10:31:29

Para agregar una columna usando un UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+
 41
Author: Mark Rajcok,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-03-24 17:52:18

Para Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))
 19
Author: Luke W,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-13 18:35:58

Me gustaría ofrecer un ejemplo generalizado para un caso de uso muy similar:

Caso de uso: Tengo un csv que consiste en:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

Necesito realizar algunas transformaciones y el csv final debe parecerse a

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

Necesito hacer esto porque este es el esquema definido por algún modelo y necesito que mis datos finales sean interoperables con Insertos SQL Bulk y cosas así.

Así que:

1) Leí el csv original usando spark.léelo y llámalo "df".

2) Le hago algo a los datos.

3) Agrego las columnas null usando este script:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

De esta manera, puede estructurar su esquema después de cargar un csv (también funcionaría para reordenar columnas si tiene que hacer esto para muchas tablas).

 0
Author: bloodrootfc,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-02 15:10:47

Puede definir un nuevo udf al agregar un column_name:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
 -1
Author: Allen211,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-27 09:02:59
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))
 -1
Author: DeFOX,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-10-18 19:23:46