Spark: Añadir columna al dataframe condicionalmente


Estoy tratando de tomar mis datos de entrada:

A    B       C
--------------
4    blah    2
2            3
56   foo     3

Y añadir una columna al final en función de si B está vacío o no:

A    B       C     D
--------------------
4    blah    2     1
2            3     0
56   foo     3     1

Puedo hacer esto fácilmente registrando el dataframe de entrada como una tabla temporal, luego escribiendo una consulta SQL.

Pero realmente me gustaría saber cómo hacer esto con solo métodos Scala y no tener que escribir una consulta SQL dentro de Scala.

He intentado .withColumn, pero no puedo conseguir que haga lo que quiero.

Author: emeth, 2016-01-20

3 answers

Intenta withColumn con la función when de la siguiente manera:

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5)))
    .toDF("A", "B", "C")

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))

newDf.show() muestra

+---+----+---+---+
|  A|   B|  C|  D|
+---+----+---+---+
|  4|blah|  2|  1|
|  2|    |  3|  0|
| 56| foo|  3|  1|
|100|null|  5|  0|
+---+----+---+---+

He añadido la fila (100, null, 5) para probar el caso isNull.

Probé este código con Spark 1.6.0 pero como se comentó en el código de when, funciona en las versiones posteriores a 1.4.0.

 64
Author: emeth,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-01-21 06:04:51

Mi error, me había perdido una parte de la pregunta.

La mejor y más limpia manera es usar un UDF. Explicación dentro del código.

// create some example data...BY DataFrame
// note, third record has an empty string
case class Stuff(a:String,b:Int)
val d= sc.parallelize(Seq( ("a",1),("b",2),
     ("",3) ,("d",4)).map { x => Stuff(x._1,x._2)  }).toDF

// now the good stuff.
import org.apache.spark.sql.functions.udf
// function that returns 0 is string empty 
val func = udf( (s:String) => if(s.isEmpty) 0 else 1 )
// create new dataframe with added column named "notempty"
val r = d.select( $"a", $"b", func($"a").as("notempty") )

    scala> r.show
+---+---+--------+
|  a|  b|notempty|
+---+---+--------+
|  a|  1|    1111|
|  b|  2|    1111|
|   |  3|       0|
|  d|  4|    1111|
+---+---+--------+
 3
Author: Roberto Congiu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-09 15:27:33

¿Qué tal algo como esto?

val newDF = df.filter($"B" === "").take(1) match {
  case Array() => df
  case _ => df.withColumn("D", $"B" === "")
}

Usar take(1) debería tener un impacto mínimo

 1
Author: Justin Pihony,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-01-20 19:53:39