¿Cómo almacenar objetos personalizados en Dataset?


De acuerdo con La introducción de conjuntos de datos Spark :

Mientras esperamos Spark 2.0, planeamos algunas mejoras interesantes en los conjuntos de datos, específicamente: ... Codificadores personalizados - si bien actualmente generamos codificadores para una amplia variedad de tipos, nos gustaría abrir una API para objetos personalizados.

Y los intentos de almacenar el tipo personalizado en un Dataset conducen al siguiente error como:

No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Tipos primitivos (Int, String, etc.) y los tipos de producto (clases de casos) son compatibles al importar SQLContext.implícita._ Soporte para serializar otros tipos se agregará en futuras versiones

O:

Java.lang.UnsupportedOperationException: No se ha encontrado ningún codificador para ....

¿Existen soluciones alternativas?


Tenga en cuenta que esta pregunta solo existe como punto de entrada para una respuesta Wiki de la Comunidad. Siéntase libre de actualizar / mejorar tanto la pregunta como la respuesta.

Author: zero323, 2016-04-15

7 answers

Actualización

Esta respuesta sigue siendo válida e informativa, aunque las cosas ahora son mejores desde 2.2 / 2.3, que agrega soporte de codificador incorporado para Set, Seq, Map, Date, Timestamp, y BigDecimal. Si se limita a crear tipos con solo clases case y los tipos Scala habituales, debería estar bien con solo lo implícito en SQLImplicits.


Desafortunadamente, prácticamente no se ha añadido nada para ayudar con esto. La búsqueda de @since 2.0.0 en Encoders.scala o SQLImplicits.scala encuentra cosas que tienen que ver principalmente con tipos primitivos (y algunos ajustes de clases de casos). Por lo tanto, lo primero que hay que decir: actualmente no hay un buen soporte para codificadores de clase personalizados. Con eso fuera del camino, lo que sigue son algunos trucos que hacen un trabajo tan bueno como podemos esperar, teniendo en cuenta lo que actualmente tenemos a nuestra disposición. Como un descargo de responsabilidad por adelantado: esto no funcionará perfectamente y haré todo lo posible para que todas las limitaciones sean claras y por adelantado.

¿Qué es exactamente el problema

Cuando desea crear un conjunto de datos, Spark "requiere un codificador (para convertir un objeto JVM de tipo T a y desde la representación interna de Spark SQL) que generalmente se crea automáticamente a través de implicits desde a SparkSession, o se puede crear explícitamente llamando a métodos estáticos en Encoders" (tomado createDataset). Un codificador tomará la forma Encoder[T] donde T es el tipo que está codificando. La primera sugerencia es agregar import spark.implicits._ (que le da estos codificadores implícitos) y la segunda sugerencia es pasar explícitamente el codificador implícito usando este conjunto de funciones relacionadas con el codificador.

No hay codificador disponible para las clases regulares, por lo que

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Le dará el siguiente error implícito relacionado con el tiempo de compilación:

No se puede encontrar el codificador para el tipo almacenado en un conjunto de datos. Los tipos primitivos (Int, String, etc.) y los tipos de producto (clases de casos) son compatibles con la importación SQLContext.implícita._ Soporte para serializar otros tipos se agregará en futuras versiones

Sin embargo, si encapsula cualquier tipo que acaba de usar para obtener el error anterior en alguna clase que se extiende Product, el error confusamente se retrasa al tiempo de ejecución, por lo que

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Compila bien, pero falla en tiempo de ejecución con

Java.lang.No soportadooperationexception: No se ha encontrado ningún codificador para myObj

La razón de esto es que los codificadores Chispa creates with the implicits are actually only made at runtime (via scala relfection). En este caso, todas las comprobaciones de Spark en tiempo de compilación es que la clase más externa extiende Product (lo que hacen todas las clases case), y solo se da cuenta en tiempo de ejecución que todavía no sabe qué hacer con MyObj (el mismo problema ocurre si intento hacer un Dataset[(Int,MyObj)] - Spark espera hasta el tiempo de ejecución para barf en MyObj). Estos son problemas centrales que están en extrema necesidad de ser solucionados:

  • algunas clases que se extienden Product compilar a pesar de fallar siempre en tiempo de ejecución y
  • no hay forma de pasar codificadores personalizados para tipos anidados (no tengo forma de alimentar a Spark con un codificador solo para MyObj de modo que sepa cómo codificar Wrap[MyObj] o (Int,MyObj)).

Solo use kryo

La solución que todo el mundo sugiere es utilizar el kryo codificador.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Sin embargo, esto se vuelve bastante tedioso rápidamente. Especialmente si su código está manipulando todo tipo de conjuntos de datos, unión, agrupación, etc. Terminas acumulando un montón de implicitos extra. Entonces, ¿por qué no hacer un implícito que hace todo esto automáticamente?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

Y ahora, parece que puedo hacer casi todo lo que quiero (el ejemplo a continuación no funcionará en el spark-shell donde spark.implicits._ se importa automáticamente)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

O casi. El problema es que usar kryo lleva a Spark a almacenar cada fila del conjunto de datos como un objeto binario plano. Para map, filter, foreach eso es suficiente, pero para operaciones como join, Spark realmente necesita separarlas en columnas. Al inspeccionar el esquema para d2 o d3, verá que solo hay una columna binaria:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Solución Parcial para tuplas

Entonces, usando la magia de implicitos en Scala (más en 6.26.3 Resolución de sobrecarga), puedo hacerme una serie de implicitos que harán el mejor trabajo posible, al menos para tuplas, y funcionarán bien con implicitos existentes:{[66]]}

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Entonces, armados con estos implicitos, puedo hacer que mi ejemplo anterior funcione, aunque con algún cambio de nombre de columna

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Todavía no he descubierto cómo obtener los nombres de tupla esperados(_1, _2, ...) por defecto sin renombrarlos - si alguien más quiere jugar con esto, this es donde se introduce el nombre "value" y this es donde se suelen añadir los nombres de tupla. Sin embargo, el punto clave es que ahora tengo un buen esquema estructurado:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

So, en resumen, esta solución:

  • nos permite obtener columnas separadas para tuplas (para que podamos unirnos en tuplas de nuevo, yay!)
  • nuevamente podemos confiar en los implicitos (así que no hay necesidad de estar pasando kryo por todo el lugar)
  • es casi totalmente compatible con import spark.implicits._ (con algún cambio de nombre involucrado)
  • does not unámonos en las kyro columnas binarias serializadas, y mucho menos en los campos que pueden tener
  • tiene el desagradable efecto secundario de cambiar el nombre de algunas de las columnas de la tupla a "valor" (si es necesario, esto se puede deshacer convirtiendo .toDF, especificando nuevos nombres de columna y volviendo a un conjunto de datos, y los nombres del esquema parecen conservarse a través de uniones, donde más se necesitan).

Solución parcial para las clases en general

Este es menos agradable y no tiene una buena solución. Sin embargo, ahora que tenemos la solución de tupla anterior, tengo una corazonada de la solución de conversión implícita de otra respuesta será un poco menos dolorosa también, ya que puede convertir sus clases más complejas a tuplas. Luego, después de crear el conjunto de datos, probablemente cambiaría el nombre de las columnas utilizando el enfoque dataframe. Si todo va bien, esto es realmente una mejora ya que ahora puedo realizar uniones en los campos de mis clases. Si hubiera usado un serializador binario plano kryo eso no habría sido posible.

Aquí hay un ejemplo que hace un poco de todo: Tengo una clase MyObj que tiene campos de tipos Int, java.util.UUID, y Set[String]. El primero se cuida solo. El segundo, aunque podría serializar usando kryo sería más útil si se almacena como un String (ya que UUID s son por lo general algo que voy a querer unir contra). La tercera realmente solo pertenece a una columna binaria.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Ahora, puedo crear un conjunto de datos con un buen esquema usando esta maquinaria:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

Y el esquema me muestra I columnas con los nombres correctos y con las dos primeras ambas cosas I puede unirse en contra.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)
 170
Author: Alec,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-13 17:19:00
  1. Usando codificadores genéricos.

    Hay dos codificadores genéricos disponibles por ahora kryo y javaSerialization donde este último se describe explícitamente como:

    Extremadamente ineficiente y solo debe utilizarse como último recurso.

    Asumiendo la siguiente clase

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    Puede utilizar estos codificadores añadiendo codificador implícito:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    Que se pueden usar juntos de la siguiente manera:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    Almacena objects as binary column so when converted to DataFrame you get following schema:

    root
     |-- value: binary (nullable = true)
    

    También es posible codificar tuplas usando kryo codificador para un campo específico:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Tenga en cuenta que aquí no dependemos de codificadores implícitos, sino que pasamos encoder explícitamente, por lo que es muy probable que esto no funcione con el método toDS.

  2. Usando conversiones implícitas:

    Proporcionar conversiones implícitas entre la representación que se puede codificar y la clase personalizada, para ejemplo:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

Preguntas relacionadas:

 28
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:10:26

Los codificadores funcionan más o menos igual en Spark2.0. Y Kryo sigue siendo la opción recomendada serialization.

Puedes ver el siguiente ejemplo con spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Hasta ahora] no había appropriate encoders en el ámbito actual, por lo que nuestras personas no estaban codificadas como binary valores. Pero eso cambiará una vez que proporcionemos algunos codificadores implicit usando Kryo serialización.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.
 5
Author: Sarvesh Kumar Singh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-09 19:48:46

En el caso de la clase Java Bean, esto puede ser útil

import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])

Ahora simplemente puede leer el DataFrame como DataFrame personalizado

dataFrame.as[MyClass]

Esto creará un codificador de clase personalizado y no uno binario.

 3
Author: Akash Mahajan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-05 13:42:13

Puede usar UDTRegistration y luego Clases de Casos, Tuplas, etc... todo funciona correctamente con su Tipo Definido por el Usuario!

Digamos que quieres usar una enumeración personalizada:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Regístralo así:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

¡Entonces ÚSALO!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Digamos que quieres usar un Registro polimórfico:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... y el uso de esta manera:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Puede escribir un UDT personalizado que codifique todo en bytes (estoy usando serialización java aquí, pero es probablemente sea mejor instrumentar el contexto Kryo de Spark).

Primero defina la clase UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Entonces regístralo:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

¡Entonces puedes usarlo!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
 3
Author: ChoppyTheLumberjack,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-08-21 22:44:08

Mis ejemplos estarán en Java, pero no imagino que sea difícil adaptarse a Scala.

He tenido bastante éxito convirtiendo RDD<Fruit> a Dataset<Fruit> usando spark.createDataset y Codificadores.bean mientras Fruit sea un simple Java Bean.

Paso 1: Crear el simple Java Bean.

public class Fruit implements Serializable {
    private String name  = "default-fruit";
    private String color = "default-color";

    // AllArgsConstructor
    public Fruit(String name, String color) {
        this.name  = name;
        this.color = color;
    }

    // NoArgsConstructor
    public Fruit() {
        this("default-fruit", "default-color");
    }

    // ...create getters and setters for above fields
    // you figure it out
}

Me quedaría con clases con tipos primitivos y Cadenas como campos antes de que la gente de DataBricks refuerce sus Codificadores. Si tenga una clase con objeto anidado, cree otro Java Bean simple con todos sus campos aplanados, para que pueda usar transformaciones RDD para asignar el tipo complejo al más simple. Seguro que es un poco de trabajo extra, pero me imagino que va a ayudar mucho en el rendimiento trabajando con un esquema plano.

Paso 2: Obtenga su conjunto de datos del RDD

SparkSession spark = SparkSession.builder().getOrCreate();
JavaSparkContext jsc = new JavaSparkContext();

List<Fruit> fruitList = ImmutableList.of(
    new Fruit("apple", "red"),
    new Fruit("orange", "orange"),
    new Fruit("grape", "purple"));
JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList);


RDD<Fruit> fruitRDD = fruitJavaRDD.rdd();
Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class);
Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean);

Y voila! Espuma, enjuague, repita.

 1
Author: Jimmy Da,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-06-06 20:19:09

Para aquellos que puedan en mi situación, también pongo mi respuesta aquí.

Para ser específico,

  1. Estaba leyendo 'Set typed data' de SQLContext. Así que el formato de datos original es DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Luego conviértalo en RDD usando rdd.map() con mutable.Tipo WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Resultado:

    (1,Set(1))

 1
Author: Taeheon Kwon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-02 01:04:16