cómo filtrar un valor nulo de spark dataframe

He creado un dataframe en spark con el siguiente esquema:

 |-- user_id: long (nullable = false)
 |-- event_id: long (nullable = false)
 |-- invited: integer (nullable = false)
 |-- day_diff: long (nullable = true)
 |-- interested: integer (nullable = false)
 |-- event_owner: long (nullable = false)
 |-- friend_id: long (nullable = false)

Y los datos se muestran a continuación:

|   user_id|  event_id|invited|day_diff|interested|event_owner|friend_id|
|   4236494| 110357109|      0|      -1|         0|  937597069|     null|
|  78065188| 498404626|      0|       0|         0| 2904922087|     null|
| 282487230|2520855981|      0|      28|         0| 3749735525|     null|
| 335269852|1641491432|      0|       2|         0| 1490350911|     null|
| 437050836|1238456614|      0|       2|         0|  991277599|     null|
| 447244169|2095085551|      0|      -1|         0| 1579858878|     null|
| 516353916|1076364848|      0|       3|         1| 3597645735|     null|
| 528218683|1151525474|      0|       1|         0| 3433080956|     null|
| 531967718|3632072502|      0|       1|         0| 3863085861|     null|
| 627948360|2823119321|      0|       0|         0| 4092665803|     null|
| 811791433|3513954032|      0|       2|         0|  415464198|     null|
| 830686203|  99027353|      0|       0|         0| 3549822604|     null|
|1008893291|1115453150|      0|       2|         0| 2245155244|     null|
|1239364869|2824096896|      0|       2|         1| 2579294650|     null|
|1287950172|1076364848|      0|       0|         0| 3597645735|     null|
|1345896548|2658555390|      0|       1|         0| 2025118823|     null|
|1354205322|2564682277|      0|       3|         0| 2563033185|     null|
|1408344828|1255629030|      0|      -1|         1|  804901063|     null|
|1452633375|1334001859|      0|       4|         0| 1488588320|     null|
|1625052108|3297535757|      0|       3|         0| 1972598895|     null|

Quiero filtrar las filas que tienen valores nulos en el campo "friend_id".

scala> val aaa = test.filter("friend_id is null")

scala> aaa.count

Tengo :res52: Long = 0 que es obvio que no está bien. ¿Cuál es la forma correcta de conseguirlo?

Una pregunta más, quiero reemplazar los valores en el campo friend_id. Quiero reemplazar null con 0 y 1 para cualquier otro valor, excepto null. El código que puedo averiguar is:

val aaa = train_friend_join.select($"user_id", $"event_id", $"invited", $"day_diff", $"interested", $"event_owner", ($"friend_id" != null)?1:0)

Este código tampoco funciona. ¿Alguien puede decirme cómo puedo arreglarlo? Gracias

Author: Steven Li, 2016-09-27

8 answers

Digamos que tienes esta configuración de datos (para que los resultados sean reproducibles):

// declaring data types
case class Company(cName: String, cId: String, details: String)
case class Employee(name: String, id: String, email: String, company: Company)

// setting up example data
val e1 = Employee("n1", null, "[email protected]", Company("c1", "1", "d1"))
val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1"))
val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1"))
val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2"))
val e5 = Employee("n5", null, "[email protected]", Company("c2", "2", "d2"))
val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2"))
val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3"))
val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3"))
val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8)
val df = sc.parallelize(employees).toDF

Los datos son:

|name|  id|    email|  company|
|  n1|null|[email protected]|[c1,1,d1]|
|  n2|   2|[email protected]|[c1,1,d1]|
|  n3|   3|[email protected]|[c1,1,d1]|
|  n4|   4|[email protected]|[c2,2,d2]|
|  n5|null|[email protected]|[c2,2,d2]|
|  n6|   6|[email protected]|[c2,2,d2]|
|  n7|   7|[email protected]|[c3,3,d3]|
|  n8|   8|[email protected]|[c3,3,d3]|

Ahora para filtrar empleados con null ids, hará --

df.filter("id is null").show

Que le mostrará correctamente lo siguiente:

|name|  id|    email|  company|
|  n1|null|[email protected]|[c1,1,d1]|
|  n5|null|[email protected]|[c2,2,d2]|

Llegando a la segunda parte de su pregunta, puede reemplazar los identificadores null por 0 y otros valores por 1 con esto {

df.withColumn("id", when($"id".isNull, 0).otherwise(1)).show

Esto resulta en:

|name| id|    email|  company|
|  n1|  0|[email protected]|[c1,1,d1]|
|  n2|  1|[email protected]|[c1,1,d1]|
|  n3|  1|[email protected]|[c1,1,d1]|
|  n4|  1|[email protected]|[c2,2,d2]|
|  n5|  0|[email protected]|[c2,2,d2]|
|  n6|  1|[email protected]|[c2,2,d2]|
|  n7|  1|[email protected]|[c3,3,d3]|
|  n8|  1|[email protected]|[c3,3,d3]|
Author: Sachin Tyagi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-01-19 15:48:50

O como df.filter($"friend_id".isNotNull)

Author: Adriana Lazar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-08 18:45:30
Author: Michael Kopaniov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-27 14:53:13

Encontré una buena solución para eliminar las filas con cualquier valor nulo:

Dataset<Row> filtered = df.filter(row -> !row.anyNull());

En caso de que uno esté interesado en el otro caso, simplemente llame a row.anyNull(). (Spark 2.1.0 usando Java API)

Author: chAlexey,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-03-20 10:35:35

Hay dos formas de hacerlo: crear la condición de filtro 1) Manualmente 2) Dinámicamente.

Ejemplo de DataFrame:

val df = spark.createDataFrame(Seq(
  (0, "a1", "b1", "c1", "d1"),
  (1, "a2", "b2", "c2", "d2"),
  (2, "a3", "b3", null, "d3"),
  (3, "a4", null, "c4", "d4"),
  (4, null, "b5", "c5", "d5")
)).toDF("id", "col1", "col2", "col3", "col4")

| id|col1|col2|col3|col4|
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
|  2|  a3|  b3|null|  d3|
|  3|  a4|null|  c4|  d4|
|  4|null|  b5|  c5|  d5|

1) Crear condición de filtro manualmente es decir, usando DataFrame where o filter función

df.filter(col("col1").isNotNull && col("col2").isNotNull).show


df.where("col1 is not null and col2 is not null").show


| id|col1|col2|col3|col4|
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
|  2|  a3|  b3|null|  d3|

2) Crear condición de filtro dinámicamente: Esto es útil cuando no queremos que ninguna columna tenga valor nulo y hay un gran número de columnas, que es sobre todo el caso.

Crear la condición de filtro manualmente en estos casos perderá mucho tiempo. En el siguiente código estamos incluyendo todas las columnas dinámicamente usando la función map y reduce en las columnas DataFrame:

val filterCond = df.columns.map(x=>col(x).isNotNull).reduce(_ && _)

Cómo se ve filterCond:

filterCond: org.apache.spark.sql.Column = (((((id IS NOT NULL) AND (col1 IS NOT NULL)) AND (col2 IS NOT NULL)) AND (col3 IS NOT NULL)) AND (col4 IS NOT NULL))


val filteredDf = df.filter(filterCond)


| id|col1|col2|col3|col4|
|  0|  a1|  b1|  c1|  d1|
|  1|  a2|  b2|  c2|  d2|
Author: Ayush Vatsyayan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-02-27 07:59:09

De la pista de Michael Kopaniov, a continuación obras

Author: Robin Wang,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-07 08:15:51

Aquí hay una solución para spark en Java. Para seleccionar filas de datos que contengan nulos. Cuando tiene datos del conjunto de datos, lo hace:

Dataset<Row> containingNulls =  data.where(data.col("COLUMN_NAME").isNull())

Para filtrar los datos sin nulos que hacer:

Dataset<Row> withoutNulls = data.where(data.col("COLUMN_NAME").isNotNull())

A menudo los dataframes contienen columnas de tipo String donde en lugar de null tenemos strings vacíos como "". Para filtrar tales datos también lo hacemos:

Dataset<Row> withoutNullsAndEmpty = data.where(data.col("COLUMN_NAME").isNotNull().and(data.col("COLUMN_NAME").notEqual("")))
Author: Andrushenko Alexander,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-12-29 14:55:31

Utilizo el siguiente código para resolver mi pregunta. Funciona. Pero como todos sabemos, trabajo alrededor de la milla de un país para resolverlo. Entonces, ¿hay un atajo para eso? Gracias

def filter_null(field : Any) : Int = field match {
    case null => 0
    case _    => 1

val test = train_event_join.join(
    train_event_join("user_id") === user_friends_pair("user_id") &&
    train_event_join("event_owner") === user_friends_pair("friend_id"),
    line => (
    }.toDF("user_id", "event_id", "invited", "day_diff", "interested", "event_owner", "creator_is_friend")
Author: Steven Li,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-09-27 15:27:44