¿Cómo agrego una nueva columna a un DataFrame de Spark (usando PySpark)?

Tengo un DataFrame de Spark (usando PySpark 1.5.1) y me gustaría agregar una nueva columna.

He intentado lo siguiente sin ningún éxito:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

También obtuvo un error usando esto:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

Entonces, ¿cómo agrego una nueva columna (basada en el vector Python) a un DataFrame existente con PySpark?

Author: user6910411, 2015-11-13

6 answers

No puede agregar una columna arbitraria a un DataFrame en Spark. Las columnas nuevas solo se pueden crear usando literales (otros tipos literales se describen en ¿Cómo agregar una columna constante en un DataFrame de Spark?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

Transformando una columna existente:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

Incluido usando join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

O generado con la función / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

En cuanto al rendimiento, las funciones integradas (pyspark.sql.functions), que se asignan a la expresión Catalyst, generalmente son preferidas sobre el usuario de Python funciones definidas.

Si desea agregar contenido de un RDD arbitrario como columna, puede

Author: zero323,
2017-05-23 10:31:29

Para agregar una columna usando un UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+
Author: Mark Rajcok,
2017-03-24 17:52:18

Para Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))
Author: Luke W,
2017-01-13 18:35:58

Me gustaría ofrecer un ejemplo generalizado para un caso de uso muy similar:

Caso de uso: Tengo un csv que consiste en:

...billion more lines

Necesito realizar algunas transformaciones y el csv final debe parecerse a

...billion more lines

Necesito hacer esto porque este es el esquema definido por algún modelo y necesito que mis datos finales sean interoperables con Insertos SQL Bulk y cosas así.

Así que:

1) Leí el csv original usando spark.léelo y llámalo "df".

2) Le hago algo a los datos.

3) Agrego las columnas null usando este script:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:

df = df.select(outcols)

De esta manera, puede estructurar su esquema después de cargar un csv (también funcionaría para reordenar columnas si tiene que hacer esto para muchas tablas).

Author: bloodrootfc,
2018-03-02 15:10:47

Puede definir un nuevo udf al agregar un column_name:

u_f = F.udf(lambda :yourstring,StringType())
Author: Allen211,
2016-12-27 09:02:59
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
df.withColumn('new_col', func_name(df.old_col))
Author: DeFOX,
2017-10-18 19:23:46