¿Cómo usar la fuente JDBC para escribir y leer datos en (Py)Spark?
El objetivo de esta pregunta es documentar:
-
Pasos necesarios para leer y escribir datos usando conexiones JDBC en PySpark
-
Posibles problemas con las fuentes JDBC y know solutions
Con pequeños cambios, estos métodos deberían funcionar con otros lenguajes compatibles, incluidos Scala y R.
3 answers
Escribiendo datos
-
Incluya el controlador JDBC aplicable cuando envíe la aplicación o inicie el shell. Puedes usar por ejemplo
--packages
:bin/pyspark --packages group:name:version
O combinando
driver-class-path
yjars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
Estas propiedades también se pueden establecer usando la variable de entorno
PYSPARK_SUBMIT_ARGS
antes de que se inicie la instancia JVM o usandoconf/spark-defaults.conf
para establecerspark.jars.packages
ospark.jars
/spark.driver.extraClassPath
. -
Elija el modo deseado. Spark JDBC writer soporta los siguientes modos:
-
append
: Añade el contenido de this :class:DataFrame
a los datos existentes. -
overwrite
: Sobrescribir los datos existentes. -
ignore
: Ignore silenciosamente esta operación si los datos ya existen. -
error
(caso predeterminado): Lanza una excepción si los datos ya existen.
No se admiten Upserts u otras modificaciones de grano fino
mode = ...
-
-
Prepare JDBC URI, por ejemplo: {[47]]}
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
-
(Opcional) dictionary of JDBC arguments (en inglés).
properties = { "user": "foo", "password": "bar" }
-
Use
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
Para guardar los datos (ver
pyspark.sql.DataFrameWriter
para más detalles).
Problemas Conocidos:
-
No se puede encontrar el controlador adecuado cuando el controlador se ha incluido utilizando
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)Asumiendo que no hay desajuste de versión del controlador para resolver esto, puede agregar
driver
clase aproperties
. Por ejemplo:properties = { ... "driver": "org.postgresql.Driver" }
-
Usando
df.write.format("jdbc").options(...).save()
puede resultar en:Java.lang.RuntimeException: org.apache.chispa.SQL.ejecución.fuentes de datos.jdbc.DefaultSource no permite crear tabla como select.
Solución desconocida.
-
En Pyspark 1.3 puedes intentar llamar al método Java directamente:
df._jdf.insertIntoJDBC(url, "baz", True)
Lectura de datos
- Siga los pasos 1-4 de Escribiendo datos
-
Use
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
O
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
Conocido problemas y trampas:
- No se puede encontrar el controlador adecuado-consulte: Escritura de datos
- Spark SQL soporta pushdown de predicados con fuentes JDBC aunque no todos los predicados pueden ser empujados hacia abajo. Tampoco delega límites ni agregaciones. Posible solución es reemplazar
dbtable
/table
argumento con una subconsulta válida. Ver por ejemplo: -
De forma predeterminada, las fuentes de datos JDBC cargan los datos secuencialmente utilizando un único subproceso ejecutor. Para garantizar la carga de datos distribuidos, puede:
- Proporcionar partición
column
(debe serIntegeType
),lowerBound
,upperBound
,numPartitions
. - Proporcione una lista de predicados mutuamente excluyentes
predicates
, uno para cada partición deseada.
- Proporcionar partición
En un modo distribuido (con columna de partición o predicados) cada ejecutor opera en su propia transacción. Si la base de datos de origen se modifica al mismo tiempo, no hay garantía de que la vista final sea coherente.
Dónde encontrar conductores adecuados:
-
Repositorio Maven (para obtener las coordenadas requeridas para
--packages
seleccione la versión deseada y copie los datos de una pestaña Gradle en un formulariocompile-group:name:version
sustituyendo los campos respectivos) o Maven Central Repositorio:
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 10:30:57
Descargar mysql-connector-java controlador y mantener en la carpeta spark jar, observar el código python abajo aquí escribiendo datos en "acotr1", tenemos que crear la estructura de tabla acotr1 en la base de datos mysql
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="Ramyam01").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=Ramyam01"
df.write.jdbc(mysql_url,table="actor1",mode="append")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-09-05 05:06:45
Consulte este enlace para descargar el jdbc para postgres y siga los pasos para descargar el archivo jar
Https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html el archivo jar se descargará en la ruta de acceso de esta manera. "/home / anand/.ivy2 / jars / org.postgresql_postgresql-42.1.1.jar "
Si su versión de spark es 2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
Guarde el archivo como python y ejecute "python respectivefilename.py"
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-02-22 07:11:59