¿Cómo agregar una columna constante en un DataFrame de Spark?

Quiero agregar una columna en un DataFrame con algún valor arbitrario (que es el mismo para cada fila). Obtengo un error cuando uso withColumn de la siguiente manera:

dt.withColumn('new_column', 10).head(5)

AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'

Parece que puedo engañar a la función para que funcione como quiera agregando y restando una de las otras columnas (para que se sumen a cero) y luego agregando el número que quiero (10 en este caso):

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)

[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

Esto es supremamente hacky, ¿verdad? Supongo que hay una forma más legítima de hacer esto?

Author: Community, 2015-09-25

2 answers

Chispa 2.2+

Spark 2.2 introduce typedLit para soportar Seq, Map, y Tuples (SPARK-19254 ) y las siguientes llamadas deben ser compatibles (Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, .0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ (lit), 1.4+ (array, struct), 2.0+ (map):

El segundo argumento para DataFrame.withColumn debería ser un Column así que tienes que usar un literal:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

Si necesitas columnas complejas puedes construirlas usando bloques como array:

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Se pueden usar exactamente los mismos métodos en Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

También es posible, aunque más lento, usar un UDF.

Author: zero323,
2017-03-05 17:07:51

En spark 2.2 hay dos formas de agregar valor constante en una columna en DataFrame:

1) Usando lit

2) Usando typedLit.

La diferencia entre los dos es que typedLit también puede manejar tipos de scala parametrizados, por ejemplo, List, Seq y Map

Ejemplo de DataFrame:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")

| id|col1|
|  0|   a|
|  1|   b|

1) Usando lit: Agregar un valor de cadena constante en una nueva columna llamada newcol:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))


| id|col1|newcol|
|  0|   a| myval|
|  1|   b| myval|

2) Usando typedLit:

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))


| id|col1|           newcol|
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
Author: Ayush Vatsyayan,
2018-02-27 07:57:17