renombrar columnas para agregados de marcos de datos pyspark


Estoy analizando algunos datos con dataframes pyspark, supongamos que tengo un dataframe df que estoy agregando:

df.groupBy("group")\
  .agg({"money":"sum"})\
  .show(100)

Esto me dará:

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646

La agregación funciona bien, pero no me gusta el nuevo nombre de columna "SUM(money#2L)". ¿Hay una manera ordenada de cambiar el nombre de esta columna a algo legible por humanos desde el método .agg? Tal vez algo más similar a lo que uno haría en dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money))
Author: cantdutchthis, 2015-05-01

4 answers

Aunque todavía prefiero la sintaxis dplyr , este fragmento de código servirá:

import pyspark.sql.functions as sf

df.groupBy("group")\
  .agg(sf.sum('money').alias('money'))\
  .show(100)

Se vuelve detallado.

 64
Author: cantdutchthis,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-08 09:41:05

withColumnRenamed debería funcionar. Aquí está el enlace al pyspark.sql API .

df.groupBy("group")\
  .agg({"money":"sum"})\
  .withColumnRenamed("SUM(money)", "money")
  .show(100)
 36
Author: dnlbrky,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-06-18 01:02:19

Hice una pequeña función de ayuda para esto que podría ayudar a algunas personas.

import re

from functools import partial

def rename_cols(agg_df, ignore_first_n=1):
    """changes the default spark aggregate names `avg(colname)` 
    to something a bit more useful. Pass an aggregated dataframe
    and the number of aggregation columns to ignore.
    """
    delimiters = "(", ")"
    split_pattern = '|'.join(map(re.escape, delimiters))
    splitter = partial(re.split, split_pattern)
    split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
    renamed = map(split_agg, agg_df.columns[ignore_first_n:])
    renamed = zip(agg_df.columns[ignore_first_n:], renamed)
    for old, new in renamed:
        agg_df = agg_df.withColumnRenamed(old, new)
    return agg_df

Un ejemplo:

gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
 .groupby("id")
 .agg({"rank": "mean",
       "*": "count",
       "rate": "mean", 
       "price": "mean", 
       "clicks": "mean", 
       })
)

>>> gb.columns
['id',
 'avg(rate)',
 'count(1)',
 'avg(price)',
 'avg(rank)',
 'avg(clicks)']

>>> rename_cols(gb).columns
['id',
 'avg_rate',
 'count_1',
 'avg_price',
 'avg_rank',
 'avg_clicks']

Haciendo al menos un poco para evitar que la gente escriba tanto.

 4
Author: Aaron Gonzales,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-06 18:10:00

Teniendo en cuenta que tienes un diccionario columns_and_operations y, después de la agregación, quieres hacer el cambio de nombre sin el hardcoding, una forma más sencilla sería:

from functools import reduce

columns_and_operations = {
        "rank": "mean",
        "*": "count",
        "rate": "mean", 
        "price": "mean", 
         "clicks": "mean"}

df = df.groupBy("group").agg(columns_and_operations)

old_names = ["{}({})".format(v, k) for k, v in columns_and_operations.items()]
new_names = list(columns_and_operations.keys())

df = reduce(lambda df, i: df.withColumnRenamed(old_names[i],
                                               new_names[i]),
            range(len(old_names)),
            df)
 0
Author: Joao Francisco Martins,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-08-24 02:09:49