0


spark2.4.0+scala2.11.12+sbt编程实现利用DataFrame读写MySQL的数据

1.要求

(1) 在MySQL数据库中新建数据库sparktest,再建表employee,包含下列两行数据;
idnamegenderage1AliceF222JohnM25

  • 表1 employee表原有数据
  1. mysql>createdatabase sparktest;
  2. mysql>use sparktest;
  3. mysql>createtable employee (id int(4), name char(20), gender char(4), age int(4));
  4. mysql>insertinto employee values(1,'Alice','F',22);
  5. mysql>insertinto employee values(2,'John','M',25);

(2) 配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,最后打印出age的最大值和age的总和。
idnamegenderage3MaryF264TomM23

  • 表2 employee表新增数据

2.代码

  1. importjava.util.Properties
  2. importorg.apache.spark.sql.types._
  3. importorg.apache.spark.sql.Row
  4. object TestMySQL {def main(args: Array[String]){importorg.apache.spark.sql.SparkSession
  5. val spark = SparkSession.builder.getOrCreate()importspark.implicits._
  6. val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))val schema = StructType(List(StructField("id", IntegerType,true),StructField("name", StringType,true),StructField("gender", StringType,true),StructField("age", IntegerType,true)))val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt))val employeeDF = spark.createDataFrame(rowRDD, schema)val prop =new Properties()
  7. prop.put("user","root")
  8. prop.put("password","root")
  9. prop.put("driver","com.mysql.jdbc.Driver")
  10. employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee", prop)val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password","root").load()
  11. jdbcDF.agg("age"->"max","age"->"sum")}}

3.错误

  1. Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
  2. at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  3. at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  4. at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  5. at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
  6. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
  7. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
  8. at scala.Option.foreach(Option.scala:257)
  9. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
  10. at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:193)
  11. at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:197)
  12. at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
  13. at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  14. at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  15. at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  16. at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  17. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  18. at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  19. at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  20. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  21. at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  22. at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  23. at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  24. at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  25. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  26. at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
  27. at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  28. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  29. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  30. at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
  31. at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
  32. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
  33. at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506)
  34. at TestMySQL$.main(testmysql.scala:18)
  35. at TestMySQL.main(testmysql.scala)
  36. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  37. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  38. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  39. at java.lang.reflect.Method.invoke(Method.java:498)
  40. at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  41. at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
  42. at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
  43. at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
  44. at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
  45. at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
  46. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
  47. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

于是我将option中的driver key 去掉 报错变为

  1. Exception in thread "main" java.sql.SQLException: No suitable driver
  2. at java.sql.DriverManager.getDriver(DriverManager.java:315)
  3. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
  4. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$6.apply(JDBCOptions.scala:105)
  5. at scala.Option.getOrElse(Option.scala:121)
  6. at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:104)
  7. at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:193)
  8. at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:197)
  9. at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
  10. at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  11. at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)

4.分析错误及解决方法

找到对应的java.sql.DriverManager 类源码,发现此类为jdbc的一个管理类,查看源码可以发现其调用的getDriver 获取驱动
在这里插入图片描述
于是通过跟踪driver变量,发现其是通过loadInitialDrivers方法来加载驱动在这里插入图片描述
所以说在使用驱动前需要将驱动加入到系统变量中,一个简单的方法就是将jar包放到spark根目录下的jars里面,在spark启动时会自动加载到系统环境中如下图,当然也可通过在代码中添加新classpath 加入到System property中也是可以的。
在这里插入图片描述

标签: mysql spark 数据库

本文转载自: https://blog.csdn.net/m0_46529816/article/details/124775818
版权归原作者 ys L 所有, 如有侵权,请联系我们删除。

“spark2.4.0+scala2.11.12+sbt编程实现利用DataFrame读写MySQL的数据”的评论:

还没有评论