¿Cómo usar la fuente JDBC para escribir y leer datos en (Py)Spark?


El objetivo de esta pregunta es documentar:

  • Pasos necesarios para leer y escribir datos usando conexiones JDBC en PySpark

  • Posibles problemas con las fuentes JDBC y know solutions

Con pequeños cambios, estos métodos deberían funcionar con otros lenguajes compatibles, incluidos Scala y R.

Author: zero323, 2015-06-22

3 answers

Escribiendo datos

  1. Incluya el controlador JDBC aplicable cuando envíe la aplicación o inicie el shell. Puedes usar por ejemplo --packages:

    bin/pyspark --packages group:name:version  
    

    O combinando driver-class-path y jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    Estas propiedades también se pueden establecer usando la variable de entorno PYSPARK_SUBMIT_ARGS antes de que se inicie la instancia JVM o usando conf/spark-defaults.conf para establecer spark.jars.packages o spark.jars / spark.driver.extraClassPath.

  2. Elija el modo deseado. Spark JDBC writer soporta los siguientes modos:

    • append: Añade el contenido de this :class:DataFrame a los datos existentes.
    • overwrite: Sobrescribir los datos existentes.
    • ignore: Ignore silenciosamente esta operación si los datos ya existen.
    • error (caso predeterminado): Lanza una excepción si los datos ya existen.

    No se admiten Upserts u otras modificaciones de grano fino

    mode = ...
    
  3. Prepare JDBC URI, por ejemplo: {[47]]}

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
  4. (Opcional) dictionary of JDBC arguments (en inglés).

    properties = {
        "user": "foo",
        "password": "bar"
    }
    
  5. Use DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

    Para guardar los datos (ver pyspark.sql.DataFrameWriter para más detalles).

Problemas Conocidos:

  • No se puede encontrar el controlador adecuado cuando el controlador se ha incluido utilizando --packages (java.sql.SQLException: No suitable driver found for jdbc: ...)

    Asumiendo que no hay desajuste de versión del controlador para resolver esto, puede agregar driver clase a properties. Por ejemplo:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
  • Usando df.write.format("jdbc").options(...).save() puede resultar en:

    Java.lang.RuntimeException: org.apache.chispa.SQL.ejecución.fuentes de datos.jdbc.DefaultSource no permite crear tabla como select.

    Solución desconocida.

  • En Pyspark 1.3 puedes intentar llamar al método Java directamente:

    df._jdf.insertIntoJDBC(url, "baz", True)
    

Lectura de datos

  1. Siga los pasos 1-4 de Escribiendo datos
  2. Use sqlContext.read.jdbc:

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    O sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    

Conocido problemas y trampas:

  • No se puede encontrar el controlador adecuado-consulte: Escritura de datos
  • Spark SQL soporta pushdown de predicados con fuentes JDBC aunque no todos los predicados pueden ser empujados hacia abajo. Tampoco delega límites ni agregaciones. Posible solución es reemplazar dbtable / table argumento con una subconsulta válida. Ver por ejemplo:
  • De forma predeterminada, las fuentes de datos JDBC cargan los datos secuencialmente utilizando un único subproceso ejecutor. Para garantizar la carga de datos distribuidos, puede:

    • Proporcionar partición column (debe ser IntegeType), lowerBound, upperBound, numPartitions.
    • Proporcione una lista de predicados mutuamente excluyentes predicates, uno para cada partición deseada.
  • En un modo distribuido (con columna de partición o predicados) cada ejecutor opera en su propia transacción. Si la base de datos de origen se modifica al mismo tiempo, no hay garantía de que la vista final sea coherente.

Dónde encontrar conductores adecuados:

 68
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 10:30:57

Descargar mysql-connector-java controlador y mantener en la carpeta spark jar, observar el código python abajo aquí escribiendo datos en "acotr1", tenemos que crear la estructura de tabla acotr1 en la base de datos mysql

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01"

    df.write.jdbc(mysql_url,table="actor1",mode="append")
 -1
Author: y durga prasad,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-09-05 05:06:45

Consulte este enlace para descargar el jdbc para postgres y siga los pasos para descargar el archivo jar

Https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html el archivo jar se descargará en la ruta de acceso de esta manera. "/home / anand/.ivy2 / jars / org.postgresql_postgresql-42.1.1.jar "

Si su versión de spark es 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

Guarde el archivo como python y ejecute "python respectivefilename.py"

 -2
Author: anand ml,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-02-22 07:11:59