Uso de Scalaz Stream para tareas de análisis (sustitución de Iterados de Scalaz)


Introducción

Utilizo iterados de Scalaz 7 en varios proyectos, principalmente para procesar archivos ish grandes. Me gustaría comenzar a cambiar a Scalaz streams , que están diseñados para reemplazar el paquete iteratee (que francamente le falta un montón de piezas y es una especie de dolor de usar).

Los flujos se basan en máquinas (otra variación de la idea iterada), que también se han implementado en Haskell. He usado la biblioteca de máquinas Haskell un poco, pero la relación entre máquinas y flujos no es completamente obvia (para mí, al menos), y la documentación para la biblioteca de flujos es todavía un poco escasa.

Esta pregunta se trata de una tarea de análisis simple que me gustaría ver implementada usando flujos en lugar de iterados. Responderé la pregunta yo mismo si nadie más me gana, pero estoy seguro de que no soy el único que está haciendo (o al menos considerando) esta transición, y ya que necesito trabajar con este ejercicio de todos modos, pensé que también podría hacerlo en público.

Tarea

Se supone que tengo un archivo que contiene oraciones que han sido tokenizadas y etiquetadas con partes del discurso:

no UH
, ,
it PRP
was VBD
n't RB
monday NNP
. .

the DT
equity NN
market NN
was VBD
illiquid JJ
. .

Hay un símbolo por línea, las palabras y partes del discurso están separadas por un solo espacio, y las líneas en blanco representan los límites de las oraciones. Quiero analizar este archivo y devolver una lista de oraciones, que también podríamos representar como listas de tuplas de cadenas:

List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)

Como de costumbre, queremos fallar correctamente si pulsamos la entrada no válida o excepciones de lectura de archivos, no queremos tener que preocuparnos por cerrar los recursos manualmente, etc.

Una solución iterada

Primero para algunas cosas generales de lectura de archivos (que realmente deberían ser parte del paquete iteratee, que actualmente no proporciona nada remotamente de este alto nivel):

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO
import iteratee.{ Iteratee => I, _ }

type ErrorOr[A] = EitherT[IO, Throwable, A]

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B](
  EitherT(action.catchLeft).map(I.sdone(_, I.emptyInput))
)

def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
  lazy val reader = r
  def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
    tryIO(IO(Option(reader.readLine))).flatMap {
      case None       => s.pointI
      case Some(line) => k(I.elInput(line)) >>== apply[A]
    }
  )
}

def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
  def apply[A] = (s: StepT[String, ErrorOr, A]) => tryIO(
    IO(new BufferedReader(new FileReader(f)))
  ).flatMap(reader => I.iterateeT[String, ErrorOr, A](
    EitherT(
      enumBuffered(reader).apply(s).value.run.ensuring(IO(reader.close()))
    )
  ))
}

Y luego nuestro lector de frases:

def sentence: IterateeT[String, ErrorOr, List[(String, String)]] = {
  import I._

  def loop(acc: List[(String, String)])(s: Input[String]):
    IterateeT[String, ErrorOr, List[(String, String)]] = s(
    el = _.trim.split(" ") match {
      case Array(form, pos) => cont(loop(acc :+ (form, pos)))
      case Array("")        => cont(done(acc, _))
      case pieces           =>
        val throwable: Throwable = new Exception(
          "Invalid line: %s!".format(pieces.mkString(" "))
        )

        val error: ErrorOr[List[(String, String)]] = EitherT.left(
          throwable.point[IO]
        )

        IterateeT.IterateeTMonadTrans[String].liftM(error)
    },
    empty = cont(loop(acc)),
    eof = done(acc, eofInput)
  )
  cont(loop(Nil))
}

Y finalmente nuestro análisis medidas:

val action =
  I.consume[List[(String, String)], ErrorOr, List] %=
  sentence.sequenceI &=
  enumFile(new File("example.txt"))

Podemos demostrar que funciona:

scala> action.run.run.unsafePerformIO().foreach(_.foreach(println))
List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))

Y hemos terminado.

Lo que quiero

Más o menos el mismo programa implementado usando flujos de Scalaz en lugar de iterados.

Author: Jacek Laskowski, 2013-08-07

1 answers

Una solución de scalaz-stream:

import scalaz.std.vector._
import scalaz.syntax.traverse._
import scalaz.std.string._

val action = linesR("example.txt").map(_.trim).
  splitOn("").flatMap(_.traverseU { s => s.split(" ") match {
    case Array(form, pos) => emit(form -> pos)
    case _ => fail(new Exception(s"Invalid input $s"))
  }})

Podemos demostrar que funciona:

scala> action.collect.attempt.run.foreach(_.foreach(println))
Vector((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.))
Vector((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))

Y hemos terminado.

La función traverseU es un combinador común de Scalaz. En este caso se utiliza para atravesar, en la mónada Process, la oración Vector generada por splitOn. Es equivalente a map seguido de sequence.

 49
Author: Apocalisp,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-12-18 15:48:22