Cargar archivo CSV con Spark


Soy nuevo en Spark e intento leer datos CSV de un archivo con Spark. Esto es lo que estoy haciendo :

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

Esperaría que esta llamada me diera una lista de las dos primeras columnas de mi archivo, pero estoy recibiendo este error:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

Aunque mi archivo CSV como más de una columna.

Author: Kernael, 2015-02-28

10 answers

¿Estás seguro de que todas las líneas tienen al menos 2 columnas? ¿Puedes probar algo como, sólo para comprobar?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternativamente, puede imprimir el culpable (si lo hay):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
 45
Author: G Quintana,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-30 18:25:57

Chispa 2.0.0+

Puede utilizar directamente la fuente de datos csv integrada:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

O

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

Sin incluir ninguna dependencia externa.

Chispa :

En lugar del análisis manual, que está lejos de ser trivial en un caso general, recomendaría spark-csv:

Asegúrese de que Spark CSV esté incluido en la ruta (--packages, --jars, --driver-class-path)

Y cargue sus datos de la siguiente manera:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Puede manejar carga, inferencia de esquemas, caída de líneas malformadas y no requiere pasar datos de Python a la JVM.

Nota:

Si conoce el esquema, es mejor evitar la inferencia de esquema y pasarlo a DataFrameReader. Suponiendo que tiene tres columnas-entero, doble y cadena:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
 116
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-08-30 08:33:21

Simplemente dividir por coma también dividirá las comas que están dentro de los campos (por ejemplo, a,b,"1,2,3",c), por lo que no se recomienda. la respuesta de zero323 es buena si quieres usar la API de DataFrames, pero si quieres apegarte a base Spark, puedes analizar csv en base Python con el módulo csv :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: Como @muon mencionó en los comentarios, esto tratará el encabezado como cualquier otra fila, por lo que necesitará extraerlo manualmente. Por ejemplo, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (asegúrese de no modificar header antes el filtro evalúa). Pero en este punto, probablemente sea mejor usar un analizador CSV integrado.

 10
Author: Galen Long,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:34:37

Y otra opción que consiste en leer el archivo CSV usando Pandas y luego importar el DataFrame de Pandas en Spark.

Por ejemplo:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
 9
Author: JP Mercier,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-11-14 00:39:52
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,separator="|");

print(df.collect())
 9
Author: y durga prasad,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-30 19:09:48

Esto está en línea con lo que JP Mercier sugirió inicialmente sobre el uso de Pandas, pero con una modificación importante: Si lee datos en Pandas en trozos, debería ser más maleable. Es decir, que puedes analizar un archivo mucho más grande de lo que los Pandas realmente pueden manejar como una sola pieza y pasarlo a Spark en tamaños más pequeños. (Esto también responde al comentario sobre por qué uno querría usar Spark si pueden cargar todo en Pandas de todos modos.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
 3
Author: abby sobh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:03:05

Ahora, también hay otra opción para cualquier archivo csv general: https://github.com/seahboonsiew/pyspark-csv como sigue:

Supongamos que tenemos el siguiente contexto

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Primero, distribuir pyspark-csv.py a ejecutores usando SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Lea los datos csv a través de SparkContext y conviértalos en DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
 3
Author: optimist,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-08-01 09:09:29

Si sus datos csv no contienen nuevas líneas en ninguno de los campos, puede cargar sus datos con textFile() y analizarlos

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
 2
Author: iec2011007,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-11-23 05:02:48

Si desea cargar csv como un dataframe, puede hacer lo siguiente:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

Funcionó bien para mí.

 1
Author: Jeril,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-11-09 10:09:02
import pandas as pd

data1 = pd.read_csv("test1.csv")
data2 = pd.read_csv("train1.csv")
 -4
Author: hey kay,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-07-31 18:01:43