Spark lee el archivo desde S3 usando sc.textFile ("s3n://…)


Tratando de leer un archivo ubicado en S3 usando spark-shell:

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log")
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12

scala> myRdd.count
java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    ... etc ...

La IOException: No hay sistema de archivos para scheme: s3n error ocurrido con:

  • Spark 1.31 or 1.40 on dev machine (no Hadoop libs)
  • Corriendo desde el Hortonworks Sandbox HDP v2.2.4 (Hadoop 2.60) que integra Spark 1.2.1 fuera de la caja
  • Usando s3: / / o s3n: / / scheme

¿Cuál es la causa de este error? Falta de dependencia, Falta de configuración, o mal uso de sc.textFile()?

O puede ser que esto se deba a un error que afecta a la compilación específica de Spark para Hadoop 2.60, como parece sugerir este post . Voy a probar Spark para Hadoop 2.40 para ver si esto resuelve el problema.

Author: Mogsdad, 2015-06-15

12 answers

Confirmó que esto está relacionado con la compilación de Spark contra Hadoop 2.60. Acaba de instalar Spark 1.4.0 "Pre construido para Hadoop 2.4 y posteriores" (en lugar de Hadoop 2.6). Y el código ahora funciona bien.

sc.textFile("s3n://bucketname/Filename") ahora plantea otro error:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

El siguiente código utiliza el formato URL S3 para mostrar que Spark puede leer el archivo S3. Usando la máquina de desarrollo (no hay libs de Hadoop).

scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:MySecretKey@zpub01/SafeAndSound_Lyrics.txt")
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

scala> lyrics.count
res1: Long = 9

Aún mejor: el código anterior con credenciales de AWS en línea en el URI de S3N se romperá si el AWS Secret Key tiene un forward"/". La configuración de credenciales de AWS en SparkContext lo solucionará. El código funciona tanto si el archivo S3 es público como privado.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/"
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern")
myRDD.count
 38
Author: Polymerase,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-19 10:21:15

A pesar de que esta pregunta ya tiene una respuesta aceptada, creo que todavía faltan los detalles exactos de por qué está sucediendo esto. Así que creo que podría haber un lugar para una respuesta más.

Si agrega la dependencia requerida hadoop-aws , su código debería funcionar.

A partir de Hadoop 2.6.0, el conector s3 FS se ha movido a una biblioteca separada llamada hadoop-aws. También hay una Jira para eso: Mueva el código del conector FS relacionado con s3 a hadoop-aws .

Esto significa que cualquier versión de spark, que se haya construido contra Hadoop 2.6.0 o posterior, tendrá que usar otra dependencia externa para poder conectarse al Sistema de archivos S3.
Aquí hay un ejemplo de sbt que he probado y funciona como se esperaba usando Apache Spark 1.6.2 construido contra Hadoop 2.6.0:

LibraryDependencies + = " org.apache.hadoop " % "hadoop-aws" % "2.6.0"

En mi caso, me encontré con algunos problemas de dependencias, por lo que se resuelve añadiendo exclusión:

LibraryDependencies + = " org.apache.hadoop " % "hadoop-aws" % "2.6.0"exclude ("tomcat"," jasper-compiler") excludeAll ExclusionRule(organization = " javax.servlet")

En otra nota relacionada, todavía tengo que probarlo, pero que se recomienda usar el sistema de archivos "s3a" y no "s3n" a partir de Hadoop 2.6.0.

La tercera generación, s3a: filesystem. Diseñado para ser un interruptor en reemplazo de s3n:, esta unión del sistema de archivos admite archivos más grandes y promete un mayor rendimiento.

 30
Author: Sergey Bahchissaraitsev,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-08-07 20:27:34

Puede agregar el parámetro packages packages con el jar apropiado: a su presentación:

bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py
 14
Author: Andrew K,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-11-20 21:25:40

Este es un código spark de ejemplo que puede leer los archivos presentes en s3

val hadoopConf = sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key)
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret)
var jobInput = sparkContext.textFile("s3://" + s3_location)
 9
Author: Kaab Awan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-17 11:21:49

Se encontró con el mismo problema en Spark 2.0.2. Lo resolví dándole de comer los frascos. Esto es lo que corrí:

$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar

scala> val hadoopConf = sc.hadoopConfiguration
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey)
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> sqlContext.read.parquet("s3://your-s3-bucket/")

Obviamente, necesitas tener los frascos en el camino donde estás ejecutando spark-shell desde

 5
Author: user592894,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-20 18:01:24

Tuve que copiar los archivos jar de una descarga de hadoop en el directorio $SPARK_HOME/jars. Usar la bandera --jars o la bandera --packages para spark-submit no funcionó.

Detalles:

  • Spark 2.3.0
  • Hadoop descargado fue 2.7.6
  • Dos archivos jar copiados eran de (hadoop dir)/share/hadoop/tools/lib/
    • aws-java-sdk-1.7.4.jar
    • hadoop-aws-2.7.6.jar
 3
Author: James D,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-10 15:14:39

Para Spark 1.4.x "Pre construido para Hadoop 2.6 y posteriores":

Acabo de copiar los paquetes S3 y S3nativos necesarios de hadoop-aws-2.6.0.tarro para spark-assembly-1.4.1-hadoop2. 6.0.frasco.

Después de eso reinicié spark cluster y funciona. No olviden de comprobar el propietario y el régimen de la jarra de montaje.

 2
Author: Uster,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-08-13 09:10:01

Hay un Spark JIRA, SPARK-7481, abierto a partir de hoy, 20 de octubre de 2016, para agregar un módulo spark-cloud que incluye dependencias transitivas en todo s3a y azure wasb: need, junto con pruebas.

Y un Spark PR para que coincida. Así es como obtengo soporte s3a en mis compilaciones de spark

Si lo hace a mano, debe obtener hadoop-aws JAR de la versión exacta que tiene el resto de sus JARS de hadoop, y una versión de los JARs de AWS 100% sincronizada con lo que Hadoop aws era compilado contra. Para Hadoop 2.7.{1, 2, 3, ...}

hadoop-aws-2.7.x.jar 
aws-java-sdk-1.7.4.jar
joda-time-2.9.3.jar
+ jackson-*-2.6.5.jar

Pon todos estos en SPARK_HOME/jars. Ejecute spark con sus credenciales configuradas en Env vars o en spark-default.conf

La prueba más simple es puede hacer un recuento de líneas de un archivo CSV

val landsatCSV = "s3a://landsat-pds/scene_list.gz"
val lines = sc.textFile(landsatCSV)
val lineCount = lines.count()

Obtener un número: todo está bien. Haz un seguimiento de la pila. Malas noticias.

 2
Author: Steve Loughran,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-20 14:48:45

S3N no es un formato de archivo predeterminado. Debe compilar su versión de Spark con una versión de Hadoop que tenga las bibliotecas adicionales utilizadas para la compatibilidad con AWS. Información adicional que encontré aquí, https://www.hakkalabs.co/articles/making-your-local-hadoop-more-like-aws-elastic-mapreduce

 1
Author: Dan Ciborowski - MSFT,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-06-15 17:42:30

Probablemente tenga que usar s3a: / scheme en lugar de s3: / o s3n:/ Sin embargo, no está funcionando fuera de la caja (para mí) para la cáscara de la chispa. Veo el siguiente stacktrace:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
        at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
        at $iwC$$iwC$$iwC.<init>(<console>:37)
        at $iwC$$iwC.<init>(<console>:39)
        at $iwC.<init>(<console>:41)
        at <init>(<console>:43)
        at .<init>(<console>:47)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
        ... 68 more

Lo que pienso-tiene que agregar manualmente la dependencia de hadoop-aws manualmente http://search.maven.org/#artifactdetails/org.apache.hadoop/hadoop-aws/2.7.1/jar Pero no tengo idea de cómo añadirlo a spark-shell correctamente.

 1
Author: pkozlov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-07-18 19:35:01

Me enfrentaba al mismo problema. Funcionó bien después de establecer el valor para fs.s3n. impl y adición de la dependencia de hadoop-aws.

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
 1
Author: PDerp15,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-08-22 15:39:54

Use s3a en lugar de s3n. Tuve un problema similar en un trabajo de Hadoop. Después de cambiar de s3n a s3a funcionó.

P. Ej.

S3a://myBucket/myFile1.log

 0
Author: Gaj,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-03-29 14:58:26