Spark: Añadir columna al dataframe condicionalmente
Estoy tratando de tomar mis datos de entrada:
A B C
--------------
4 blah 2
2 3
56 foo 3
Y añadir una columna al final en función de si B está vacío o no:
A B C D
--------------------
4 blah 2 1
2 3 0
56 foo 3 1
Puedo hacer esto fácilmente registrando el dataframe de entrada como una tabla temporal, luego escribiendo una consulta SQL.
Pero realmente me gustaría saber cómo hacer esto con solo métodos Scala y no tener que escribir una consulta SQL dentro de Scala.
He intentado .withColumn
, pero no puedo conseguir que haga lo que quiero.
3 answers
Intenta withColumn
con la función when
de la siguiente manera:
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`
val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5)))
.toDF("A", "B", "C")
val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))
newDf.show()
muestra
+---+----+---+---+
| A| B| C| D|
+---+----+---+---+
| 4|blah| 2| 1|
| 2| | 3| 0|
| 56| foo| 3| 1|
|100|null| 5| 0|
+---+----+---+---+
He añadido la fila (100, null, 5)
para probar el caso isNull
.
Probé este código con Spark 1.6.0
pero como se comentó en el código de when
, funciona en las versiones posteriores a 1.4.0
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-01-21 06:04:51
Mi error, me había perdido una parte de la pregunta.
La mejor y más limpia manera es usar un UDF
.
Explicación dentro del código.
// create some example data...BY DataFrame
// note, third record has an empty string
case class Stuff(a:String,b:Int)
val d= sc.parallelize(Seq( ("a",1),("b",2),
("",3) ,("d",4)).map { x => Stuff(x._1,x._2) }).toDF
// now the good stuff.
import org.apache.spark.sql.functions.udf
// function that returns 0 is string empty
val func = udf( (s:String) => if(s.isEmpty) 0 else 1 )
// create new dataframe with added column named "notempty"
val r = d.select( $"a", $"b", func($"a").as("notempty") )
scala> r.show
+---+---+--------+
| a| b|notempty|
+---+---+--------+
| a| 1| 1111|
| b| 2| 1111|
| | 3| 0|
| d| 4| 1111|
+---+---+--------+
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-09 15:27:33
¿Qué tal algo como esto?
val newDF = df.filter($"B" === "").take(1) match {
case Array() => df
case _ => df.withColumn("D", $"B" === "")
}
Usar take(1)
debería tener un impacto mínimo
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-01-20 19:53:39