Cómo asignar números contiguos únicos a elementos en un RDD de Spark


Tengo un conjunto de datos de (user, product, review), y quiero introducirlo en el algoritmo ALS de mllib.

El algoritmo necesita que los usuarios y productos sean números, mientras que los míos son nombres de usuario de cadenas y SKU de cadenas.

Ahora mismo, obtengo los distintos usuarios y SKU, luego les asigno ID numéricos fuera de Spark.

Me preguntaba si había una mejor manera de hacer esto. El único enfoque que he pensado es escribir un RDD personalizado que esencialmente enumere 1 a n, luego llame a zip on los dos RDDs.

Author: zero323, 2014-05-29

5 answers

Comenzando con Spark 1.0 hay dos métodos que puedes usar para resolver esto fácilmente:

  • RDD.zipWithIndex es igual que Seq.zipWithIndex, agrega números contiguos (Long). Esto necesita contar los elementos en cada partición primero, por lo que su entrada será evaluada dos veces. Almacene en caché su RDD de entrada si desea usar esto.
  • RDD.zipWithUniqueId también te da identificadores Long únicos, pero no se garantiza que sean contiguos. (Solo serán contiguos si cada partición tiene el mismo número de elemento.) La ventaja es que esto no necesita saber nada sobre la entrada, por lo que no causará una doble evaluación.
 38
Author: Daniel Darabos,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-07-12 23:27:16

Para un caso de uso de ejemplo similar, acabo de hash los valores de cadena. Véase http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/

def nnHash(tag: String) = tag.hashCode & 0x7FFFFF
var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))

Parece que ya estás haciendo algo como esto, aunque el hash puede ser más fácil de manejar.

Matei sugirió aquí un enfoque para emular zipWithIndex en un RDD, lo que equivale a asignar ID dentro de cada partición que van a ser globalmente únicos: https://groups.google.com/forum/#! topic / spark-users / WxXvcn2gl1E

 14
Author: Sean Owen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-05-29 20:38:07

Otra opción fácil, si se usan DataFrames y solo se preocupa por la unicidad es usar function MonotonicallyIncreasingID

import org.apache.spark.sql.functions.monotonicallyIncreasingId 
val newDf = df.withColumn("uniqueIdColumn", monotonicallyIncreasingId)

Editar: MonotonicallyIncreasingIDfue obsoleto y eliminado desde Spark 2.0; ahora se conoce como monotonically_increasing_id.

 7
Author: radek1st,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-12 04:36:23

Monótonamente_increasing_id() parece ser la respuesta, pero desafortunadamente no funcionará para ALS ya que produce números de 64 bits y ALS espera números de 32 bits (ver mi comentario debajo de la respuesta de radek1st para deets).

La solución que encontré es usar zipWithIndex(), como se menciona en la respuesta de Darabos. He aquí cómo implementarlo:

Si ya tiene un DataFrame de una sola columna con sus distintos usuarios llamados userids, puede crear una tabla de búsqueda (LUT) como sigue:

# PySpark code
user_als_id_LUT = sqlContext.createDataFrame(userids.rdd.map(lambda x: x[0]).zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)]))

Ahora puedes:

  • Use esta LUT para obtener ID de enteros compatibles con ALS para proporcionar a ALS
  • Use esta LUT para hacer una búsqueda inversa cuando necesite volver de ALS ID al ID original

Haga lo mismo para los elementos, obviamente.

 2
Author: xenocyon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-01 01:45:31

La gente ya ha recomendado monotonically_increasing_id(), y mencionó el problema de que crea Longs, no Ints.

Sin embargo, en mi experiencia (caveat - Spark 1.6) - si lo utiliza en un solo ejecutor (repartición a 1 antes), no hay prefijo ejecutor utilizado, y el número puede ser lanzado con seguridad a Int. Obviamente, usted necesita tener menos que Entero.MAX_VALUE filas.

 2
Author: Eyal,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-11-29 12:21:49