Spark admite escaneos de columna verdaderos sobre archivos de parquet en S3?


Una de las grandes ventajas del formato de almacenamiento de datos de Parquet es que es columnar. Si tengo un conjunto de datos' ancho ' con cientos de columnas, pero mi consulta solo toca algunas de ellas, entonces es posible leer solo los datos que almacenan esas pocas columnas y omitir el resto.

Presumiblemente esta característica funciona leyendo un poco de metadatos en la cabeza de un archivo de parquet que indica las ubicaciones en el sistema de archivos para cada columna. El lector puede entonces buscar en el disco para leer solo las columnas necesarias.

¿Alguien sabe si el lector de parquet predeterminado de spark implementa correctamente este tipo de búsqueda selectiva en S3? Creo que es compatible con S3, pero hay una gran diferencia entre el soporte teórico y una implementación que explota adecuadamente ese soporte.

Author: Jacek Laskowski, 2016-09-26

4 answers

Esto necesita ser desglosado

  1. El código de Parquet obtiene los predicados de spark (yes)
  2. parquet intenta entonces leer selectivamente solo esas columnas, utilizando el Hadoop FileSystem seek() + read() o readFully(position, buffer, length) llamadas? Sí
  3. ¿El conector S3 traduce estas Operaciones de archivo en solicitudes HTTP GET eficientes? En Amazon EMR: Sí. En Apache Hadoop, necesita hadoop 2.8 en el classpath y configure correctamente spark.hadoop.fs.s3a.experimental.fadvise=random para activar aleatoriamente acceso.

Hadoop 2.7 y anteriores manejan mal la agresiva seek() alrededor del archivo, porque siempre inician un GET offset-end-of-file, se sorprenden por la próxima seek, tienen que abortar esa conexión, reabrir una nueva conexión TCP/HTTPS 1.1 (lenta, CPU pesada), hacerlo de nuevo, repetidamente. La operación IO aleatoria duele en la carga a granel de cosas como .csv.gz, pero es fundamental para obtener ORC / Parquet perf.

No obtienes la aceleración en Hadoop-aws JAR de Hadoop 2.7. Si lo necesitas necesita actualizar hadoop*.jar y dependencias, o construir Spark desde cero contra Hadoop 2.8

Tenga en cuenta que Hadoop 2.8+ también tiene una pequeña característica: si llama a toString() en un cliente de sistema de archivos S3A en una instrucción de registro, imprime todas las estadísticas de E / s del sistema de archivos, incluida la cantidad de datos que se descartaron en seeks, conexiones TCP abortadas, etc. Le ayuda a averiguar qué está pasando.

2018-04-13 advertencia: : No intente dejar caer el JAR Hadoop 2.8 + hadoop-aws en el classpath a lo largo de con el resto del conjunto de JARRAS hadoop-2.7 y esperar ver cualquier aumento de velocidad. Todo lo que verás son rastros de pila. Es necesario actualizar todos los JARs de hadoop y sus dependencias transitivas.

 6
Author: Steve Loughran,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-03-13 11:21:15

DESCARGO DE RESPONSABILIDAD: No tengo una respuesta definitiva y tampoco quiero actuar como una fuente autorizada, pero he pasado algún tiempo en el soporte de parquet en Spark 2.2+ y espero que mi respuesta pueda ayudarnos a todos a acercarnos a la respuesta correcta.


¿Parquet en S3 evita extraer los datos de las columnas no utilizadas de S3 y solo recupera los trozos de archivo que necesita, o extrae todo el archivo?

Uso Spark 2.3.0-SNAPSHOT que construí hoy desde el maestro .

parquet el formato de fuente de datos es manejado por ParquetFileFormat que es un FileFormat .

Si estoy en lo correcto, la parte de lectura es manejada por el método buildReaderWithPartitionValues (que anula los FileFormat's).

buildReaderWithPartitionValues se utiliza exclusivamente cuando se solicita el operador físico FileSourceScanExec para los llamados RDD de entrada que son en realidad un solo RDD para generar filas internas cuando WholeStageCodegenExec es ejecutar.

Dicho esto, creo que revisar lo que hace buildReaderWithPartitionValues puede acercarnos a la respuesta final.

Cuando nos fijamos en la línea usted puede estar seguro de que estamos en el camino correcto.

// Intente empujar hacia abajo los filtros cuando filtro push-down está habilitado.

Esa ruta de código depende de spark.sql.parquet.filterPushdown la propiedad Spark que está activada por defecto.

Spark.SQL.parquet.filterPushdown Permite Parquet filtrar la optimización push-down cuando se establece en true.

Que nos lleva a parquet-hadoop ParquetInputFormat.setFilterPredicate iff los filtros están definidos.

if (pushed.isDefined) {
  ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}

El código se vuelve más interesante un poco más tarde cuando se utilizan los filtros cuando el código vuelve a parquet-mr (en lugar de usar el llamado lector de decodificación de parquet vectorizado). Esa es la parte que realmente no entiendo (excepto lo que puedo ver en el código).

Tenga en cuenta que el lector de decodificación de parquet vectorizado está controlado por la propiedad Spark spark.sql.parquet.enableVectorizedReader que está activada de forma predeterminada.

CONSEJO: Para saber qué parte de la expresión if se usa, habilite el nivel de registro DEBUG para org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat logger.

Para ver todos los filtros empujados hacia abajo, puede activar INFO el nivel de registro de org.apache.spark.sql.execution.FileSourceScanExec logger. Deberías ver lo siguiente en los registros :

INFO Pushed Filters: [pushedDownFilters]

Espero que si no está cerca de ser una respuesta definitiva que ha ayudado a un little y alguien lo recoge donde lo dejé para hacerlo pronto. La esperanza muere el último :)

 5
Author: Jacek Laskowski,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-11-29 14:47:41

No, la pushdown de predicados no es totalmente compatible. Esto, por supuesto, depende de:

  • Caso de uso específico
  • Versión Spark
  • Tipo y versión del conector S3

Para verificar su caso de uso específico, puede habilitar el nivel de registro de depuración en Spark y ejecutar su consulta. Luego, puede ver si hay" seeks " durante las solicitudes S3 (HTTP), así como cuántas solicitudes se enviaron realmente. Algo como esto:

17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[\r][\n]"

Aquí está el ejemplo de una informe de problemas que se abrió recientemente debido a la incapacidad de Spark 2.1 para calcular COUNT(*) de todas las filas de un conjunto de datos basado en metadatos almacenados en el archivo Parquet: https://issues.apache.org/jira/browse/SPARK-21074

 1
Author: spektom,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-11-29 14:17:13

Parquet reader de spark es como cualquier otro formato de entrada,

  1. Ninguno de los InputFormat tiene nada especial para S3. Los formatos de entrada pueden leer desde LocalFileSystem, Hdfs y S3 sin ninguna optimización especial hecha para eso.

  2. Parquet InputFormat dependiendo de las columnas que pregunte leerá selectivamente las columnas por usted .

  3. Si desea estar completamente seguro (aunque push down predicates funciona en la última versión de spark) seleccione manualmente columnas y escribir la transformación y las acciones, en lugar de depender de SQL

 0
Author: KrazyGautam,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-03-20 20:46:51