Eliminar duplicados de filas basados en columnas específicas en un DataFrame RDD/Spark


Digamos que tengo un conjunto de datos bastante grande en la siguiente forma:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

Lo que me gustaría hacer es eliminar filas duplicadas basadas en los valores de la primera,tercera y cuarta columnas solamente.

Eliminar filas completamente duplicadas es sencillo:

data = data.distinct()

Y se eliminará la fila 5 o la fila 6

Pero, ¿cómo elimino solo las filas duplicadas basadas en las columnas 1, 3 y 4? es decir, eliminar uno de estos:

('Baz',22,'US',6)
('Baz',36,'US',6)

En Python, esto podría hacerse especificando columnas con .drop_duplicates(). ¿Cómo puedo lograr lo mismo en Spark/Pyspark?

Author: Jason, 2015-05-15

7 answers

Pyspark incluye un método dropDuplicates(). https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

Tal vez se introdujo en una versión posterior a lo que @Jason (OP) estaba utilizando?

Editar: sí, se introdujo en 1.4

 30
Author: vaer-k,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-10-05 18:21:57

De su pregunta, no está claro cómo-a qué columnas desea utilizar para determinar los duplicados. La idea general detrás de la solución es crear una clave basada en los valores de las columnas que identifican duplicados. Luego, puede usar las operaciones reduceByKey o reducir para eliminar duplicados.

Aquí hay un código para comenzar:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

Ahora, tiene un valor clave RDD que está marcado por las columnas 1, 3 y 4. El siguiente paso sería un reduceByKey o groupByKey y filter. Este eliminaría duplicados.

r = m.reduceByKey(lambda x,y: (x))
 19
Author: Mike,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-04-18 12:40:06

De acuerdo con David. Para agregar, puede que no sea el caso de que queramos agrupar todas las columnas que no sean las columnas en la función aggregate, es decir, si queremos eliminar duplicados puramente basados en un subconjunto de columnas y conservar todas las columnas en el dataframe original. Así que la mejor manera de hacer esto podría ser usando dropDuplicates Dataframe api disponible en Spark 1.4.0

Para referencia, ver: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

 10
Author: technotring,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-10 13:04:06

Sé que ya aceptaste la otra respuesta, pero si quieres hacer esto como un DataFrame, solo usa groupBy y agg. Suponiendo que ya tiene un DF creado (con columnas llamadas "col1", "col2", etc.) podría hacer:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

Tenga en cuenta que en este caso, elegí el máximo de col2, pero podría hacer avg, min, etc.

 9
Author: David Griffin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-05-15 10:54:27

Usé la función incorporada dropDuplicates(). Código Scala que figura a continuación

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

Salida:

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+
 7
Author: Aravind Krishnakumar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-11-18 03:11:29

Este es mi Df contiene 4 se repite dos veces por lo que aquí se eliminarán los valores repetidos.

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+
 0
Author: Nilesh Shinde,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-11-10 07:30:38

El siguiente programa le ayudará a soltar duplicados en su totalidad, o si desea soltar duplicados basados en ciertas columnas , incluso puede hacerlo:

import org.apache.spark.sql.SparkSession

object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()

import spark.implicits._

// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)

// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

println("*** Here's the whole DataFrame with duplicates")

customerDF.printSchema()

customerDF.show()

// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()

println("*** Now without duplicates")

withoutDuplicates.show()

// drop fully identical rows
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

println("*** Now without partial duplicates too")

withoutPartials.show()

 }
 }
 0
Author: Sampat Kumar,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-05-21 14:37:24