¿Cómo leo un archivo CSV grande con la clase Scala Stream?


¿Cómo leo un archivo CSV grande (> 1 Gb) con una secuencia de Scala? ¿Tienes un ejemplo de código? ¿O usaría una forma diferente de leer un archivo CSV grande sin cargarlo en la memoria primero?

Author: Jan Willem Tulp, 2010-11-23

3 answers

Simplemente use Source.fromFile(...).getLines como ya ha indicado.

Que devuelve un Iterador, que ya es perezoso (Usarías stream como una colección perezosa donde querías que los valores recuperados anteriormente se memorizaran, para que puedas leerlos de nuevo)

Si tienes problemas de memoria, entonces el problema estará en lo que estás haciendo después de getLines. Cualquier operación como toList, que fuerza una colección estricta, causará el problema.

 65
Author: Kevin Wright,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2010-11-23 11:05:33

Espero que no significa Scala collection.immutable.Stream con la Corriente. Esto es no lo que quieres. Stream es perezoso, pero hace memoización.

No se lo que planeas hacer, pero solo leer el archivo línea por línea debería funcionar muy bien sin usar grandes cantidades de memoria.

getLines debe evaluar perezosamente y no debe bloquearse (siempre y cuando su archivo no tenga más de 232 líneas, afaik). Si lo hace, pregunte en #scala o envíe un ticket de error (o haga ambas cosas).

 11
Author: soc,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2010-11-23 11:03:23

Si está buscando procesar el archivo grande línea por línea mientras evita requerir que todo el contenido del archivo se cargue en la memoria de una vez, entonces puede usar el Iterator devuelto por scala.io.Source.

Tengo una pequeña función, tryProcessSource, (que contiene dos subfunciones) que uso exactamente para este tipo de casos de uso. La función toma hasta cuatro parámetros, de los cuales solo se requiere el primero. Los otros parámetros tienen valores predeterminados sanos proporcionados.

Aquí está la función perfil (la implementación de la función completa está en la parte inferior):

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
  ???
}

El primer parámetro, file: File, es necesario. Y es cualquier instancia válida de java.io.File que apunta a un archivo de texto orientado a líneas, como un CSV.

El segundo parámetro, parseLine: (Int, String) => Option[List[String]], es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int, unparsedLine: String. Y luego devuelve un Option[List[String]]. La función puede devolver un Some wrapped List[String] que consiste en los valores de columna válidos. O puede volver a None que indica que todo el proceso de transmisión es abortar temprano. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, line) => Some(List(line)). Este valor predeterminado hace que toda la línea se devuelva como un único valor String.

El tercer parámetro, filterLine: (Int, List[String]) => Option[Boolean], es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int, parsedValues: List[String]. Y luego devuelve un Option[Boolean]. La función puede devolver un Some envuelto Boolean indicando si este particular la línea debe incluirse en la salida. O puede devolver un None que indica que todo el proceso de streaming está abortando temprano. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(true). Este valor predeterminado hace que se incluyan todas las líneas.

El cuarto y último parámetro, retainValues: (Int, List[String]) => Option[List[String]], es opcional. Y si se proporciona, debe ser una función que espera recibir dos parámetros de entrada; index: Int, parsedValues: List[String]. Y luego devuelve un Option[List[String]]. La función puede devolver un Some envuelto List[String] consisting of some subset and / or alteration of the existing column values. O puede devolver un None que indica que todo el proceso de streaming está abortando temprano. Si no se proporciona este parámetro, se proporciona un valor predeterminado de (index, values) => Some(values). Este valor predeterminado da como resultado los valores analizados por el segundo parámetro, parseLine.

Considere un archivo con el siguiente contenido (4 líneas):

street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240

El siguiente perfil de llamada...

val tryLinesDefaults =
  tryProcessSource(new File("path/to/file.csv"))

...resultados en esta salida para tryLinesDefaults (el contenido inalterado del archivo):

Success(
  List(
    List("street,street2,city,state,zip"),
    List("100 Main Str,,Irving,TX,75039"),
    List("231 Park Ave,,Irving,TX,75039"),
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
  )
)

El siguiente perfil de llamada...

val tryLinesParseOnly =
  tryProcessSource(
      new File("path/to/file.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
  )

...resulta en esta salida para tryLinesParseOnly (cada línea se analiza en los valores de columna individuales):

Success(
  List(
    List("street","street2","city","state","zip"),
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
  )
)

El siguiente perfil de llamada...

val tryLinesIrvingTxNoHeader =
  tryProcessSource(
      new File("C:/Users/Jim/Desktop/test.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
    , filterLine =
        (index, parsedValues) =>
          Some(
            (index != 0) && //skip header line
            (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
            (parsedValues(3).toLowerCase == "Tx".toLowerCase)
          )
  )

...resulta en esta salida para tryLinesIrvingTxNoHeader (cada línea se analiza en los valores de columna individuales, sin encabezado y solo las dos filas en Irving, Tx):

Success(
  List(
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
  )
)

Aquí está toda la función tryProcessSource aplicación:

import scala.io.Source
import scala.util.Try

import java.io.File

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
  def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
    try {Try(transfer(source))} finally {source.close()}
  def recursive(
    remaining: Iterator[(String, Int)],
    accumulator: List[List[String]],
    isEarlyAbort: Boolean =
      false
  ): List[List[String]] = {
    if (isEarlyAbort || !remaining.hasNext)
      accumulator
    else {
      val (line, index) =
        remaining.next
      parseLine(index, line) match {
        case Some(values) =>
          filterLine(index, values) match {
            case Some(keep) =>
              if (keep)
                retainValues(index, values) match {
                  case Some(valuesNew) =>
                    recursive(remaining, valuesNew :: accumulator) //capture values
                  case None =>
                    recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                }
              else
                recursive(remaining, accumulator) //discard row
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        case None =>
          recursive(remaining, accumulator, isEarlyAbort = true) //early abort
      }
    }
  }
  Try(Source.fromFile(file)).flatMap(
    bufferedSource =>
      usingSource(bufferedSource) {
        source =>
          recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
      }
  )
}

Si bien esta solución es relativamente sucinta, me llevó un tiempo considerable y muchos pases de refactorización antes de que finalmente pudiera llegar hasta aquí. Por favor, hágamelo saber si ve alguna forma de mejorarlo.


ACTUALIZACIÓN: Acabo de preguntar el problema a continuación como es propia pregunta de StackOverflow. Y ahora tiene una respuesta que corrige el error mencionado a continuación.

Tuve la idea de intentar hacer este cambio aún más genérico el parámetro retainValues a transformLine con la nueva definición de función genérica a continuación. Sin embargo, sigo recibiendo el error de resaltado en IntelliJ "Expresión de tipo Some[List[String]] doesn't conform to expected type Option[A]" y no pude averiguar cómo cambiar el valor predeterminado para que el error desaparezca.

def tryProcessSource2[A <: AnyRef](
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  transformLine: (Int, List[String]) => Option[A] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
  ???
}

Cualquier ayuda sobre cómo hacer este trabajo sería muy apreciada.

 3
Author: chaotic3quilibrium,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:02:56