renombrar columnas para agregados de marcos de datos pyspark
Estoy analizando algunos datos con dataframes pyspark, supongamos que tengo un dataframe df
que estoy agregando:
df.groupBy("group")\
.agg({"money":"sum"})\
.show(100)
Esto me dará:
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
La agregación funciona bien, pero no me gusta el nuevo nombre de columna "SUM(money#2L)". ¿Hay una manera ordenada de cambiar el nombre de esta columna a algo legible por humanos desde el método .agg
? Tal vez algo más similar a lo que uno haría en dplyr
:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
4 answers
Aunque todavía prefiero la sintaxis dplyr , este fragmento de código servirá:
import pyspark.sql.functions as sf
df.groupBy("group")\
.agg(sf.sum('money').alias('money'))\
.show(100)
Se vuelve detallado.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-08 09:41:05
withColumnRenamed
debería funcionar. Aquí está el enlace al pyspark.sql API .
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-06-18 01:02:19
Hice una pequeña función de ayuda para esto que podría ayudar a algunas personas.
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names `avg(colname)`
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
Un ejemplo:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
Haciendo al menos un poco para evitar que la gente escriba tanto.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-06 18:10:00
Teniendo en cuenta que tienes un diccionario columns_and_operations
y, después de la agregación, quieres hacer el cambio de nombre sin el hardcoding, una forma más sencilla sería:
from functools import reduce
columns_and_operations = {
"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean"}
df = df.groupBy("group").agg(columns_and_operations)
old_names = ["{}({})".format(v, k) for k, v in columns_and_operations.items()]
new_names = list(columns_and_operations.keys())
df = reduce(lambda df, i: df.withColumnRenamed(old_names[i],
new_names[i]),
range(len(old_names)),
df)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-08-24 02:09:49