¿Cómo almacenar objetos personalizados en Dataset?
De acuerdo con La introducción de conjuntos de datos Spark :
Mientras esperamos Spark 2.0, planeamos algunas mejoras interesantes en los conjuntos de datos, específicamente: ... Codificadores personalizados - si bien actualmente generamos codificadores para una amplia variedad de tipos, nos gustaría abrir una API para objetos personalizados.
Y los intentos de almacenar el tipo personalizado en un Dataset
conducen al siguiente error como:
No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Tipos primitivos (Int, String, etc.) y los tipos de producto (clases de casos) son compatibles al importar SQLContext.implícita._ Soporte para serializar otros tipos se agregará en futuras versiones
O:
Java.lang.UnsupportedOperationException: No se ha encontrado ningún codificador para ....
¿Existen soluciones alternativas?
Tenga en cuenta que esta pregunta solo existe como punto de entrada para una respuesta Wiki de la Comunidad. Siéntase libre de actualizar / mejorar tanto la pregunta como la respuesta.
7 answers
Actualización
Esta respuesta sigue siendo válida e informativa, aunque las cosas ahora son mejores desde 2.2 / 2.3, que agrega soporte de codificador incorporado para Set
, Seq
, Map
, Date
, Timestamp
, y BigDecimal
. Si se limita a crear tipos con solo clases case y los tipos Scala habituales, debería estar bien con solo lo implícito en SQLImplicits
.
Desafortunadamente, prácticamente no se ha añadido nada para ayudar con esto. La búsqueda de @since 2.0.0
en Encoders.scala
o SQLImplicits.scala
encuentra cosas que tienen que ver principalmente con tipos primitivos (y algunos ajustes de clases de casos). Por lo tanto, lo primero que hay que decir: actualmente no hay un buen soporte para codificadores de clase personalizados. Con eso fuera del camino, lo que sigue son algunos trucos que hacen un trabajo tan bueno como podemos esperar, teniendo en cuenta lo que actualmente tenemos a nuestra disposición. Como un descargo de responsabilidad por adelantado: esto no funcionará perfectamente y haré todo lo posible para que todas las limitaciones sean claras y por adelantado.
¿Qué es exactamente el problema
Cuando desea crear un conjunto de datos, Spark "requiere un codificador (para convertir un objeto JVM de tipo T a y desde la representación interna de Spark SQL) que generalmente se crea automáticamente a través de implicits desde a SparkSession
, o se puede crear explícitamente llamando a métodos estáticos en Encoders
" (tomado createDataset
). Un codificador tomará la forma Encoder[T]
donde T
es el tipo que está codificando. La primera sugerencia es agregar import spark.implicits._
(que le da estos codificadores implícitos) y la segunda sugerencia es pasar explícitamente el codificador implícito usando este conjunto de funciones relacionadas con el codificador.
No hay codificador disponible para las clases regulares, por lo que
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Le dará el siguiente error implícito relacionado con el tiempo de compilación:
No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Los tipos primitivos (Int, String, etc.) y los tipos de producto (clases de casos) son compatibles con la importación SQLContext.implícita._ Soporte para serializar otros tipos se agregará en futuras versiones
Sin embargo, si encapsula cualquier tipo que acaba de usar para obtener el error anterior en alguna clase que se extiende Product
, el error confusamente se retrasa al tiempo de ejecución, por lo que
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Compila bien, pero falla en tiempo de ejecución con
Java.lang.No soportadooperationexception: No se ha encontrado ningún codificador para myObj
La razón de esto es que los codificadores Chispa creates with the implicits are actually only made at runtime (via scala relfection). En este caso, todas las comprobaciones de Spark en tiempo de compilación es que la clase más externa extiende Product
(lo que hacen todas las clases case), y solo se da cuenta en tiempo de ejecución que todavía no sabe qué hacer con MyObj
(el mismo problema ocurre si intento hacer un Dataset[(Int,MyObj)]
- Spark espera hasta el tiempo de ejecución para barf en MyObj
). Estos son problemas centrales que están en extrema necesidad de ser solucionados:
- algunas clases que se extienden
Product
compilar a pesar de fallar siempre en tiempo de ejecución y - no hay forma de pasar codificadores personalizados para tipos anidados (no tengo forma de alimentar a Spark con un codificador solo para
MyObj
de modo que sepa cómo codificarWrap[MyObj]
o(Int,MyObj)
).
Solo use kryo
La solución que todo el mundo sugiere es utilizar el kryo
codificador.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Sin embargo, esto se vuelve bastante tedioso rápidamente. Especialmente si su código está manipulando todo tipo de conjuntos de datos, unión, agrupación, etc. Terminas acumulando un montón de implicitos extra. Entonces, ¿por qué no hacer un implícito que hace todo esto automáticamente?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
Y ahora, parece que puedo hacer casi todo lo que quiero (el ejemplo a continuación no funcionará en el spark-shell
donde spark.implicits._
se importa automáticamente)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
O casi. El problema es que usar kryo
lleva a Spark a almacenar cada fila del conjunto de datos como un objeto binario plano. Para map
, filter
, foreach
eso es suficiente, pero para operaciones como join
, Spark realmente necesita separarlas en columnas. Al inspeccionar el esquema para d2
o d3
, verá que solo hay una columna binaria:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Solución Parcial para tuplas
Entonces, usando la magia de implicitos en Scala (más en 6.26.3 Resolución de sobrecarga), puedo hacerme una serie de implicitos que harán el mejor trabajo posible, al menos para tuplas, y funcionarán bien con implicitos existentes:{[66]]}
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Entonces, armados con estos implicitos, puedo hacer que mi ejemplo anterior funcione, aunque con algún cambio de nombre de columna
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Todavía no he descubierto cómo obtener los nombres de tupla esperados(_1
, _2
, ...) por defecto sin renombrarlos - si alguien más quiere jugar con esto, this es donde se introduce el nombre "value"
y this es donde se suelen añadir los nombres de tupla. Sin embargo, el punto clave es que ahora tengo un buen esquema estructurado:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
So, en resumen, esta solución:
- nos permite obtener columnas separadas para tuplas (para que podamos unirnos en tuplas de nuevo, yay!)
- nuevamente podemos confiar en los implicitos (así que no hay necesidad de estar pasando
kryo
por todo el lugar) - es casi totalmente compatible con
import spark.implicits._
(con algún cambio de nombre involucrado) - does not unámonos en las
kyro
columnas binarias serializadas, y mucho menos en los campos que pueden tener - tiene el desagradable efecto secundario de cambiar el nombre de algunas de las columnas de la tupla a "valor" (si es necesario, esto se puede deshacer convirtiendo
.toDF
, especificando nuevos nombres de columna y volviendo a un conjunto de datos, y los nombres del esquema parecen conservarse a través de uniones, donde más se necesitan).
Solución parcial para las clases en general
Este es menos agradable y no tiene una buena solución. Sin embargo, ahora que tenemos la solución de tupla anterior, tengo una corazonada de la solución de conversión implícita de otra respuesta será un poco menos dolorosa también, ya que puede convertir sus clases más complejas a tuplas. Luego, después de crear el conjunto de datos, probablemente cambiaría el nombre de las columnas utilizando el enfoque dataframe. Si todo va bien, esto es realmente una mejora ya que ahora puedo realizar uniones en los campos de mis clases. Si hubiera usado un serializador binario plano kryo
eso no habría sido posible.
Aquí hay un ejemplo que hace un poco de todo: Tengo una clase MyObj
que tiene campos de tipos Int
, java.util.UUID
, y Set[String]
. El primero se cuida solo. El segundo, aunque podría serializar usando kryo
sería más útil si se almacena como un String
(ya que UUID
s son por lo general algo que voy a querer unir contra). La tercera realmente solo pertenece a una columna binaria.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Ahora, puedo crear un conjunto de datos con un buen esquema usando esta maquinaria:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Y el esquema me muestra I columnas con los nombres correctos y con las dos primeras ambas cosas I puede unirse en contra.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-13 17:19:00
-
Usando codificadores genéricos.
Hay dos codificadores genéricos disponibles por ahora
kryo
yjavaSerialization
donde este último se describe explícitamente como:Extremadamente ineficiente y solo debe utilizarse como último recurso.
Asumiendo la siguiente clase
class Bar(i: Int) { override def toString = s"bar $i" def bar = i }
Puede utilizar estos codificadores añadiendo codificador implícito:
object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] }
Que se pueden usar juntos de la siguiente manera:
object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } }
Almacena objects as
binary
column so when converted toDataFrame
you get following schema:root |-- value: binary (nullable = true)
También es posible codificar tuplas usando
kryo
codificador para un campo específico:val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
Tenga en cuenta que aquí no dependemos de codificadores implícitos, sino que pasamos encoder explícitamente, por lo que es muy probable que esto no funcione con el método
toDS
. -
Usando conversiones implícitas:
Proporcionar conversiones implícitas entre la representación que se puede codificar y la clase personalizada, para ejemplo:
object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } }
Preguntas relacionadas:
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:10:26
Los codificadores funcionan más o menos igual en Spark2.0
. Y Kryo
sigue siendo la opción recomendada serialization
.
Puedes ver el siguiente ejemplo con spark-shell
scala> import spark.implicits._
import spark.implicits._
scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders
scala> case class NormalPerson(name: String, age: Int) {
| def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class NormalPerson
scala> case class ReversePerson(name: Int, age: String) {
| def aboutMe = s"I am ${name}. I am ${age} years old."
| }
defined class ReversePerson
scala> val normalPersons = Seq(
| NormalPerson("Superman", 25),
| NormalPerson("Spiderman", 17),
| NormalPerson("Ironman", 29)
| )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))
scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]
scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
scala> ds1.show()
+---------+---+
| name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
| Ironman| 29|
+---------+---+
scala> ds2.show()
+----+---------+
|name| age|
+----+---------+
| 25| Superman|
| 17|Spiderman|
| 29| Ironman|
+----+---------+
scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.
scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]
scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
Hasta ahora] no había appropriate encoders
en el ámbito actual, por lo que nuestras personas no estaban codificadas como binary
valores. Pero eso cambiará una vez que proporcionemos algunos codificadores implicit
usando Kryo
serialización.
// Provide Encoders
scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]
scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]
// Ecoders will be used since they are now present in Scope
scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]
scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]
// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
| value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+
scala> ds4.show()
+--------------------+
| value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+
// Our instances still work as expected
scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.
scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-09 19:48:46
En el caso de la clase Java Bean, esto puede ser útil
import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
Ahora simplemente puede leer el DataFrame como DataFrame personalizado
dataFrame.as[MyClass]
Esto creará un codificador de clase personalizado y no uno binario.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-05 13:42:13
Puede usar UDTRegistration y luego Clases de Casos, Tuplas, etc... todo funciona correctamente con su Tipo Definido por el Usuario!
Digamos que quieres usar una enumeración personalizada:
trait CustomEnum { def value:String }
case object Foo extends CustomEnum { val value = "F" }
case object Bar extends CustomEnum { val value = "B" }
object CustomEnum {
def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}
Regístralo así:
// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
override def sqlType: DataType = org.apache.spark.sql.types.StringType
override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
// Note that this will be a UTF8String type
override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}
// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)
¡Entonces ÚSALO!
case class UsingCustomEnum(id:Int, en:CustomEnum)
val seq = Seq(
UsingCustomEnum(1, Foo),
UsingCustomEnum(2, Bar),
UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())
Digamos que quieres usar un Registro polimórfico:
trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly
... y el uso de esta manera:
case class UsingPoly(id:Int, poly:CustomPoly)
Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
Puede escribir un UDT personalizado que codifique todo en bytes (estoy usando serialización java aquí, pero es probablemente sea mejor instrumentar el contexto Kryo de Spark).
Primero defina la clase UDT:
class CustomPolyUDT extends UserDefinedType[CustomPoly] {
val kryo = new Kryo()
override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
override def serialize(obj: CustomPoly): Any = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(obj)
bos.toByteArray
}
override def deserialize(datum: Any): CustomPoly = {
val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
val ois = new ObjectInputStream(bis)
val obj = ois.readObject()
obj.asInstanceOf[CustomPoly]
}
override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}
Entonces regístralo:
// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)
¡Entonces puedes usarlo!
// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)
Seq(
UsingPoly(1, new FooPoly(1)),
UsingPoly(2, new BarPoly("Blah", 123)),
UsingPoly(3, new FooPoly(1))
).toDS
polySeq.filter(_.poly match {
case FooPoly(value) => value == 1
case _ => false
}).show()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-08-21 22:44:08
Mis ejemplos estarán en Java, pero no imagino que sea difícil adaptarse a Scala.
He tenido bastante éxito convirtiendo RDD<Fruit>
a Dataset<Fruit>
usando spark.createDataset y Codificadores.bean mientras Fruit
sea un simple Java Bean.
Paso 1: Crear el simple Java Bean.
public class Fruit implements Serializable {
private String name = "default-fruit";
private String color = "default-color";
// AllArgsConstructor
public Fruit(String name, String color) {
this.name = name;
this.color = color;
}
// NoArgsConstructor
public Fruit() {
this("default-fruit", "default-color");
}
// ...create getters and setters for above fields
// you figure it out
}
Me quedaría con clases con tipos primitivos y Cadenas como campos antes de que la gente de DataBricks refuerce sus Codificadores. Si tenga una clase con objeto anidado, cree otro Java Bean simple con todos sus campos aplanados, para que pueda usar transformaciones RDD para asignar el tipo complejo al más simple. Seguro que es un poco de trabajo extra, pero me imagino que va a ayudar mucho en el rendimiento trabajando con un esquema plano.
Paso 2: Obtenga su conjunto de datos del RDD
SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();
List<Fruit> fruitList = ImmutableList.of(
new Fruit("apple", "red"),
new Fruit("orange", "orange"),
new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);
RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);
Y voila! Espuma, enjuague, repita.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-06-06 20:19:09
Para aquellos que puedan en mi situación, también pongo mi respuesta aquí.
Para ser específico,
-
Estaba leyendo 'Set typed data' de SQLContext. Así que el formato de datos original es DataFrame.
val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()
+---+---+ | a| b| +---+---+ | 1|[1]| +---+---+
-
Luego conviértalo en RDD usando rdd.map() con mutable.Tipo WrappedArray.
sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)
Resultado:
(1,Set(1))
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-02 01:04:16