Forma óptima de crear una canalización de ml en Apache Spark para conjuntos de datos con un gran número de columnas


Estoy trabajando con Spark 2.1.1 en un conjunto de datos con ~2000 características y tratando de crear una tubería de ML básica, que consiste en algunos Transformadores y un Clasificador.

Supongamos, en aras de la simplicidad, que la tubería con la que estoy trabajando consiste en un VectorAssembler, StringIndexer y un Clasificador, que sería un caso de uso bastante común.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

Si los pasos de la tubería están separados en una tubería de transformador (VectorAssembler + StringIndexer) y un segundo clasificador canalización, y si las columnas innecesarias se dejan caer entre ambas canalizaciones, el entrenamiento tiene éxito. Esto significa que para reutilizar los modelos, dos modelos de Pipelin tienen que ser guardados después de la formación y un paso intermedio de preprocesamiento tiene que ser introducido.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

La solución (en mi humilde opinión) mucho más limpia sería fusionar todas las etapas de la tubería en una sola tubería.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

Sin embargo, poner todas las PipelineStages en una tubería conduce a la siguiente excepción, probablemente debido al problema este PR finalmente resolverá:

ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Grupo constante para la organización de clases.apache.chispa.SQL.catalizador.expresiones.GeneratedClass Specif SpecificUnsafeProjection has grown past JVM limit of 0xFFFF

La razón de esto es que el VectorAssembler duplica efectivamente (en este ejemplo) la cantidad de datos en el DataFrame, ya que no hay ningún transformador que pueda eliminar las columnas innecesarias. (Ver spark pipeline vector assembler drop otras columnas )

Para el ejemplo funciona en el conjunto de datos golub y los siguientes pasos de preprocesamiento son necesarios:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

Como soy nuevo en Spark, no estoy seguro de cuál sería la mejor manera de resolver este problema. ¿Podría sugerirlo?..

  1. para crear un nuevo transformador, que cae columnas y que se puede incorporar en la tubería?
  2. dividir ambas tuberías e introducir el paso intermedio
  3. ¿algo más? :)

O me estoy perdiendo algo importante (pasos de la tubería, relaciones públicas, etc.) que resolvería este problema?


Editar:

Implementé un nuevo Transformador DroppingVectorAssembler, que elimina columnas innecesarias, sin embargo, se lanza la misma excepción.

Además de eso, establecer spark.sql.codegen.wholeStage a false no resuelve el problema.

Author: Community, 2017-05-11

2 answers

El error janino se debe al número de variables constantes creadas durante el proceso del optimizador. El límite máximo de variables constantes permitidas en la JVM es ((2^16) -1). Si se excede este límite, entonces se obtiene el Constant pool for class ... has grown past JVM limit of 0xFFFF

La JIRA que solucionará este problema es SPARK-18016, pero todavía está en progreso en este momento.

Es muy probable que su código falle durante la etapa VectorAssembler, cuando tiene que funcionar contra miles de columnas durante una sola tarea de optimización.

La solución que desarrollé para este problema es crear un "vector de vectores" trabajando contra subconjuntos de las columnas y luego juntando los resultados al final para crear un vector de entidad singular. Esto evita que cualquier tarea de optimización supere el límite constante de JVM. No es elegante, pero lo he usado en conjuntos de datos que alcanzan el rango de columnas de 10k.

Este método también le permite mantener una sola canalización, aunque requiere algunos pasos adicionales para que funcione (creación de los sub-vectores). Después de crear el vector de entidad a partir de los subvectoriales, puede soltar las columnas de origen originales si lo desea.

Código de ejemplo:

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(Nota: El método de crear las listas de columnas realmente debería hacerse programáticamente, pero he mantenido este ejemplo simple por el bien de entender el concepto.)

 2
Author: JamCon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-10-22 17:03:22

El error janino que está obteniendo se debe a que, dependiendo del conjunto de características, el código generado se vuelve más grande.

Separaría los pasos en diferentes canalizaciones y eliminaría las características innecesarias, guardaría los modelos intermedios como StringIndexer y OneHotEncoder y los cargaría mientras se realiza la predicción, lo que también es útil porque las transformaciones serían más rápidas para los datos que se deben predecir.

Finalmente, no es necesario mantener las columnas de entidades después de ejecutar VectorAssembler stage como transforma las entidades en una columna feature vector y label y eso es todo lo que necesita para ejecutar predicciones.

Ejemplo de Canalización en Scala con ahorro de pasos intermedios - (Antigua API de spark)

Además, si está utilizando una versión anterior de spark como 1.6.0, debe verificar la versión parcheada, es decir, 2.1.1 o 2.2.0 o 1.6.4 o de lo contrario, golpearía el error Janino incluso con alrededor de 400 columnas de características.

 2
Author: Ramandeep Nanda,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-10-15 17:12:07