Actualización de una columna dataframe en spark


Mirando la nueva api de spark dataframe, no está claro si es posible modificar las columnas de dataframe.

¿Cómo cambiaría un valor en la fila x columna y de un dataframe?

{[8] {} En[5]} esto sería df.ix[x,y] = new_value

Editar: Consolidando lo que se dijo a continuación, no puede modificar el dataframe existente ya que es inmutable, pero puede devolver un nuevo dataframe con las modificaciones deseadas.

Si solo desea reemplazar un valor en una columna basado en un condición, como np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

Si desea realizar alguna operación en una columna y crear una nueva columna que se agregue al dataframe:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

Si desea que la nueva columna tenga el mismo nombre que la columna anterior, puede agregar el paso adicional:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Author: Luke, 2015-03-18

4 answers

Si bien no puede modificar una columna como tal, puede operar en una columna y devolver un nuevo DataFrame que refleje ese cambio. Para eso primero crearía un UserDefinedFunction implementando la operación a aplicar y luego aplicaría selectivamente esa función solo a la columna de destino. En Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df ahora tiene el mismo esquema que old_df (asumiendo que old_df.target_column también era de tipo StringType) pero todos los valores en la columna target_column serán new_value.

 52
Author: karlson,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-02-21 22:02:49

Comúnmente al actualizar una columna, queremos asignar un valor antiguo a un nuevo valor. Aquí hay una manera de hacer eso en pyspark sin UDF:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).
 29
Author: Paul,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-12-21 22:23:26

DataFrames se basan en RDDs. Los RDD son estructuras inmutables y no permiten actualizar elementos in situ. Para cambiar los valores, necesitará crear un nuevo DataFrame transformando el original ya sea usando las operaciones DSL tipo SQL o RDD como map.

Una cubierta de diapositivas muy recomendable: Introducción de DataFrames en Spark para la Ciencia de Datos a Gran Escala.

 12
Author: maasg,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-02-24 21:56:18

Al igual que maasg dice que puede crear un nuevo DataFrame a partir del resultado de un mapa aplicado al DataFrame antiguo. Un ejemplo para un DataFrame dado df con dos filas:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Tenga en cuenta que si los tipos de las columnas cambian, debe darle un esquema correcto en lugar de df.schema. Echa un vistazo a la api de org.apache.spark.sql.Row para los métodos disponibles: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Actualizar] O usar UDFs en Scala:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

Y si el nombre de la columna necesita permanecer igual, puede cambiarle el nombre:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
 11
Author: radek1st,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 11:33:15