Sobrescribir particiones específicas en el método de escritura de spark dataframe


Quiero sobrescribir particiones específicas en lugar de todas en spark. Estoy intentando el siguiente comando:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')

Donde df es dataframe que tiene los datos incrementales que se sobrescribirán.

Hdfs-base-path contiene los datos maestros.

Cuando pruebo el comando anterior, borra todas las particiones e inserta las presentes en df en la ruta hdfs.

Lo que mi requisito es sobrescribir solo aquellas particiones presentes en df en la ruta hdfs especificada. Puede alguien por favor, ayúdame en esto.

Author: Prasad Khode, 2016-07-20

8 answers

Este es un problema común. La única solución con Spark hasta 2.0 es escribir directamente en el directorio de particiones, por ejemplo,

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")

Si está utilizando Spark antes de la versión 2.0, deberá evitar que Spark emita archivos de metadatos (porque interrumpirán la detección automática de particiones) utilizando:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

Si está utilizando Spark antes de la versión 1.6.2, también deberá eliminar el archivo _SUCCESS en /root/path/to/data/partition_col=value o su presencia interrumpirá la detección automática de particiones. (Recomiendo encarecidamente usar 1.6.2 o posterior.)

Puede obtener algunos detalles más sobre cómo administrar tablas con particiones grandes en mi charla de Spark Summit sobre Trabajos a prueba de balas.

 23
Author: Sim,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-07-25 20:15:38

Finalmente! Ahora es una característica de Spark 2.3.0: https://issues.apache.org/jira/browse/SPARK-20236

Para usarlo, debe configurar la chispa .SQL.fuente.partitionOverwriteMode si se establece en dinámico, el conjunto de datos debe particionarse y el modo de escritura sobrescribir. Ejemplo:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

Recomiendo hacer una repartición basada en su columna de partición antes de escribir, para que no termine con 400 archivos por carpeta.

Antes de Spark 2.3.0, el mejor la solución sería lanzar sentencias SQL para eliminar esas particiones y luego escribirlas con mode append.

 16
Author: Madhava Carrillo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-07-19 14:45:53

Usando Spark 1.6...

El HiveContext puede simplificar este proceso en gran medida. La clave es que primero debe crear la tabla en el subárbol usando una instrucción CREATE EXTERNAL TABLE con el particionamiento definido. Por ejemplo:

# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'

Desde aquí, digamos que tiene un Dataframe con nuevos registros para una partición específica (o varias particiones). Puede usar una instrucción SQL de HiveContext para realizar un INSERT OVERWRITE usando este Dataframe, que sobrescribirá la tabla solo para las particiones contenidas en Dataframe:

# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')

hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                   SELECT name, age
                   FROM update_dataframe""")

Nota: update_dataframe en este ejemplo tiene un esquema que coincide con el de la tabla de destino test.

Un error fácil de cometer con este enfoque es omitir el paso CREATE EXTERNAL TABLE en la colmena y simplemente hacer la tabla utilizando los métodos de escritura de la API de Dataframe. Para las tablas basadas en parquet en particular, la tabla no se definirá adecuadamente para soportar la función INSERT OVERWRITE... PARTITION de Hive.

Espero que esto ayude.

 6
Author: vertigokidd,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-24 20:24:35

Si usa DataFrame, posiblemente desee usar la tabla de la colmena sobre los datos. En este caso, solo necesita llamar al método

df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)

Sobrescribirá las particiones que contiene DataFrame.

No es necesario especificar format (orc), porque Spark utilizará el formato de tabla Hive.

Funciona bien en Spark versión 1.6

 1
Author: L. Viktor,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-12-20 07:33:01

Probé el siguiente enfoque para sobrescribir una partición particular en la tabla de la COLMENA.

### load Data and check records
    raw_df = spark.table("test.original")
    raw_df.count()

lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925


### Check data in few partitions.
    sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
    print "Number of records: ", sample.count()
    sample.show()


### Back-up the partitions before deletion
    raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")


### UDF : To delete particular partition.
    def delete_part(table, part):
        qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
        spark.sql(qry)


### Delete partitions
    part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
    part_list = part_df.rdd.map(lambda x : x[0]).collect()

    table = "test.original"
    for p in part_list:
        delete_part(table, p)


### Do the required Changes to the columns in partitions
    df = spark.table("test.original_bkp")
    newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
    newdf.select("c_customer_sk", "c_preferred_cust_flag").show()


### Write the Partitions back to Original table
    newdf.write.insertInto("test.original")


### Verify data in Original table
    orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()



Hope it helps.

Regards,

Neeraj
 1
Author: neeraj bhadani,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-07-26 08:30:13

Podría hacer algo como esto para hacer que el trabajo vuelva a entrar (idempotent): (lo probé en spark 2.2)

# drop the partition
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
print drop_query
spark.sql(drop_query)

# delete directory
dbutils.fs.rm(<partition_directoy>,recurse=True)

# Load the partition
df.write\
  .partitionBy("partition_col")\
  .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)
 0
Author: jatin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-02-27 02:32:37

Te sugiero que hagas limpieza y luego escribas nuevas particiones con el modo Append:

import scala.sys.process._
def deletePath(path: String): Unit = {
    s"hdfs dfs -rm -r -skipTrash $path".!
}

df.select(partitionColumn).distinct.collect().foreach(p => {
    val partition = p.getAs[String](partitionColumn)
    deletePath(s"$path/$partitionColumn=$partition")
})

df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)

Esto eliminará solo las particiones nuevas. Después de escribir los datos ejecute este comando si necesita actualizar metastore:

sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")

Nota: deletePath asume que el comando hfds está disponible en su sistema.

 0
Author: gorros,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-04 08:27:26

En lugar de escribir directamente en la tabla de destino, le sugeriría que cree una tabla temporal como la tabla de destino e inserte sus datos allí.

CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';

Una vez creada la tabla, escribirías tus datos en el tmpLocation

df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)

Luego recuperaría las rutas de partición de la tabla ejecutando:

MSCK REPAIR TABLE tmpTbl;

Obtenga las rutas de partición consultando los metadatos de la colmena como:

SHOW PARTITONS tmpTbl;

Elimine estas particiones de trgtTbl y mueva los directorios de tmpTbl a trgtTbl

 0
Author: Joha,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-09-07 06:51:06