PySpark: Cómo convertir una columna de matriz (es decir, lista) a Vector


Versión Corta de la cuestión!

Considere el siguiente fragmento (suponiendo que spark ya está establecido en algunos SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

Observe que el campo de temperaturas es una lista de flotadores. Me gustaría convertir estas listas de flotadores al tipo MLlib Vector, y me gustaría que esta conversión se expresara usando la API básica DataFrame en lugar de ir a través de RDDs (que es ineficiente porque envía todos los datos de la JVM a Python, el procesamiento se realiza en Python, no obtenga los beneficios del optimizador Catalyst de Spark, yada yada). ¿Cómo hago esto? Específicamente:

  1. ¿Hay alguna manera de conseguir que un molde recto funcione? Consulte a continuación los detalles (y un intento fallido de una solución alternativa)? O, ¿hay alguna otra operación que tenga el efecto que yo buscaba?
  2. ¿Cuál es más eficiente de las dos soluciones alternativas que sugiero a continuación (UDF vs exploding/reassembling the items in the list)? ¿O hay alguna otra alternativa casi-pero-no-del todo-correcta que son mejores que cualquiera de ellos?

Un molde recto no funciona

Esto es lo que yo esperaría que fuera la solución "adecuada". Quiero convertir el tipo de una columna de un tipo a otro, así que debería usar un cast. Como un poco de contexto, permítanme recordarles la forma normal de lanzarlo a otro tipo:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

Ahora, por ejemplo, df_with_strings.collect()[0]["temperatures"][1] es '-7.0'. Pero si echo a un vector ml entonces las cosas no van tan bien:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

Esto da una error:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

Uy! ¿Alguna idea de cómo arreglar esto?

Posibles alternativas

Alternativa 1: Usar VectorAssembler

Hay un Transformer que parece casi ideal para este trabajo: el VectorAssembler. Toma una o más columnas y las concatena en un solo vector. Desafortunadamente solo toma Vector y Float columnas, no Array columnas, por lo que lo siguiente no funciona:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

Da este error:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

El mejor trabajo que puedo pensar en es explotar la lista en múltiples columnas y luego utilizar el VectorAssembler para recoger todos de nuevo:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

Esto parece que sería ideal, excepto que TEMPERATURE_COUNT sea más de 100, y a veces más de 1000. (Otro problema es que el código sería más complicado si no sabes el tamaño de la matriz de antemano, aunque ese no es el caso de mis datos.) Spark realmente genera un conjunto de datos intermedios con tantas columnas, o simplemente considera esto un paso intermedio que los elementos individuales pasan transitoriamente (o, de hecho, optimiza este paso ausente por completo cuando ve que el único uso de estas columnas es ensamblarse en un vector)?

Alternativa 2: utilizar un UDF

Una alternativa bastante más simple es usar un UDF para hacer la conversión. Esto me permite expresar muy directamente lo que quiero hacer en una línea de código, y no requiere hacer un conjunto de datos con un número loco de columnas. Pero todos esos datos tienen que ser intercambiado entre Python y la JVM, y cada número individual tiene que ser manejado por Python (que es notoriamente lento para iterar sobre elementos de datos individuales). Así es como se ve:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

Comentarios ignorables

Las secciones restantes de esta pregunta son algunas cosas adicionales que se me ocurrieron mientras intentaba encontrar una respuesta. Probablemente pueden ser omitidos por la mayoría de las personas que leen esto.

No es una solución: use Vector para comenzar con

En este trivial ejemplo es posible crear los datos usando el tipo vectorial para empezar, pero por supuesto mis datos no son realmente una lista de Python que estoy paralelizando, sino que se leen desde una fuente de datos. Pero para que conste, así es como se vería:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

Solución ineficiente: use map()

Una posibilidad es usar el método RDD map() para transformar la lista a un Vector. Esto es similar a la idea de UDF, excepto que es aún peor debido al costo de la serialización, etc. ser incurrido para todos los campos en cada fila, no solo el que se opera en. Para que conste, así es como se vería esa solución:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Intento fallido de una solución para cast

En desesperación, noté que Vector está representado internamente por una estructura con cuatro campos, pero usar un molde tradicional de ese tipo de estructura tampoco funciona. Aquí hay una ilustración (donde construí la estructura usando un udf, pero el udf no es lo importante part):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

Esto da el error:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
Author: Community, 2017-02-09

2 answers

Personalmente iría con Python UDF y no me molestaría con nada más:

  • Vectors no son tipos SQL nativos, por lo que habrá sobrecarga de rendimiento de una manera u otra. En particular, este proceso requiere dos pasos donde los datos se convierten primero de tipo externo a row, y luego de row a representación interna utilizando generic RowEncoder.
  • Cualquier ML descendente Pipeline será mucho más caro que una simple conversión. Por otra parte requiere un proceso que opuesto al descrito anteriormente

Pero si realmente quieres otras opciones aquí estás:

  • Scala UDF con Python wrapper:

    Instale sbt siguiendo las instrucciones del sitio del proyecto.

    Cree un paquete Scala con la siguiente estructura:

    .
    ├── build.sbt
    └── udfs.scala
    

    Editar build.sbt (ajustar para reflejar la versión de Scala y Spark):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.spark" %% "spark-mllib" % "2.1.0"
    )
    

    Editar udfs.scala:

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    Paquete:

    sbt package
    

    E incluir (o equivalente dependiendo de Scala vers:

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
    

    Como argumento para --driver-class-path al iniciar shell / enviar la solicitud.

    En PySpark define un wrapper:

    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext
    
    def as_vector(col):
        sc = SparkContext.getOrCreate()
        f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
        return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
    

    Prueba:

    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • Vuelque los datos a un formato JSON que refleje el esquema DenseVector y léalo de nuevo:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    
 14
Author: user6910411,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:26:01

Tuve el mismo problema que tú y lo hice de esta manera. Esta forma incluye la transformación RDD, por lo que no es crítico el rendimiento, pero funciona.

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

El resultado es,

DataFrame[city: string, temperatures: vector]
 3
Author: GGDammy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-01-19 04:43:20