0


Spark读写MySQL数据库

Spark读写MySQL数据库

文章目录

一、读取数据库

(一)通过RDD的方式读取MySQL数据库

四要素:驱动、连接地址、账号密码

importorg.apache.spark.rdd.JdbcRDD
importorg.apache.spark.sql.SparkSession

importjava.sql.DriverManager

/**
 * 使用RDD读取MySQL数据库
 */object spark_read_mysql {def main(args: Array[String]):Unit={//创建SparkSession,作用:连接Sparkval spark = SparkSession
      .builder().master("local[*]")//指定运行的方式.appName("spark_read_mysql")//程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext

    //驱动名称val driver ="com.mysql.cj.jdbc.Driver"//连接信息val url ="jdbc:mysql://192.168.80.145:3306/test"//用户名val username ="root"//密码val password ="123456"//具体的SQL查询语句val sql ="select * from t_user where id>=? and id<=?"//查询val rsRDD =new JdbcRDD(
      sc,()=>{//加载驱动
        Class.forName(driver)//创建和MySQL数据库的连接
        DriverManager.getConnection(url,username,password)},//需要执行的SQL语句
      sql,//查询的开始行1,//查询的结束行20,//运行几个分区执行2,//返回值的处理(将返回值变为RDD的元素),数字从1开始,表示字段的编号
      rs =>(rs.getInt(1),rs.getString(2),rs.getInt(3)))//将RDD的元素打印在终端
    rsRDD.collect().foreach(println)

    sc.stop()}}
(二)通过DataFrame的方式读取MySQL数据库
importorg.apache.spark.sql.SparkSession

/**
 * 使用DataFrame读取MySQL数据库
 */object spark_read_mysql2 {def main(args: Array[String]):Unit={//创建SparkSession,作用:连接Sparkval spark = SparkSession
      .builder().master("local[*]")//指定运行的方式.appName("spark_read_mysql2")//程序的名字.getOrCreate()//创建DataFrameval jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.80.145:3306/test")//指定连接.option("driver","com.mysql.cj.jdbc.Driver")//指定驱动.option("user","root")//指定连接的用户.option("password","123456")//指定连接的用户的密码.option("dbtable","t_user")//查询的表.load()//加载数据库表//在终端显示DataFrame的内容
    jdbcDF.show()}}

二、添加数据到MySQL

(一)通过RDD的方式插入数据到MySQL

每个分区执行一次创建连接和关闭连接

importorg.apache.spark.sql.SparkSession

importjava.sql.DriverManager

/**
 * 使用RDD插入数据到MySQL,RDD的每个元素都会执行一次创建连接和关闭连接
 */object spark_write_mysql {def main(args: Array[String]):Unit={//创建SparkSession,作用:连接Sparkval spark = SparkSession
      .builder().master("local[*]")//指定运行的方式.appName("spark_write_mysql")//程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext

    //驱动名称val driver ="com.mysql.cj.jdbc.Driver"//连接信息//?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码val url ="jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"//用户名val username ="root"//密码val password ="123456"//创建RDDval rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每条元素,将元素插入MySQL;一个元素执行一次创建连接和插入和关闭连接
    rdd.foreach {case(name,age)=>{//加载驱动
        Class.forName(driver)//创建和MySQL的链接val conn = DriverManager.getConnection(url,username,password)//添加的SQL语句val sql ="insert into t_user(name,age) values(?,?)"//给SQL语句配置参数val ps = conn.prepareStatement(sql)//根据参数的类型配置参数
        ps.setString(1,name)
        ps.setInt(2,age)//执行SQL语句
        ps.executeUpdate()//关闭连接
        ps.close()
        conn.close()}}

    sc.stop()}}
(二)通过RDD的方式插入数据到MySQL 2

每个分区执行一次创建连接和关闭连接

importorg.apache.spark.sql.SparkSession

importjava.sql.DriverManager

/**
 * 使用RDD插入数据到MySQL,RDD的每个分区执行一次创建连接和关闭连接;推荐
 */object spark_write_mysql2 {def main(args: Array[String]):Unit={//创建SparkSession,作用:连接Sparkval spark = SparkSession
      .builder().master("local[*]")//指定运行的方式.appName("spark_write_mysql2")//程序的名字.getOrCreate()//创建SparkContextval sc = spark.sparkContext

    //驱动名称val driver ="com.mysql.cj.jdbc.Driver"//连接信息//?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码val url ="jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"//用户名val username ="root"//密码val password ="123456"//创建RDDval rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))//打印RDD的元素//rdd.collect().foreach(println)//通过循环的方式读取RDD的每个分区,将元素插入MySQL;一个分区执行一次创建连接和关闭连接
    rdd.foreachPartition {
      datas =>{//加载驱动
        Class.forName(driver)//创建和MySQL的链接val conn = DriverManager.getConnection(url,username,password)//添加的SQL语句val sql ="insert into t_user(name,age) values(?,?)"//给SQL语句配置参数val ps = conn.prepareStatement(sql)//根据参数的类型配置参数
        datas.foreach{case(name,age)=>{
            ps.setString(1,name)
            ps.setInt(2,age)//执行SQL语句
            ps.executeUpdate()}}//关闭连接
        ps.close()
        conn.close()}}

    sc.stop()}}
(三)使用DataFrame插入数据到MySQL
importorg.apache.spark.sql.{Row, SparkSession}importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}/**
 * 使用DataFrame插入数据到MySQL
 */object spark_write_mysql3 {def main(args: Array[String]):Unit={//创建SparkSession,作用:连接Sparkval spark = SparkSession
      .builder().master("local[*]")//指定运行的方式.appName("spark_write_mysql3")//程序的名字.getOrCreate()//1.创建DataFrame//1.1 schemaval schema = StructType(List(StructField("name", StringType,true),StructField("age",IntegerType,true)))//1.2 行rows//1.2.1 创建RDDval dataRDD = spark.sparkContext.parallelize(Array(Array("李四",20),Array("王五",20)))//1.2.2 创建rowsval rows = dataRDD.map(x=>Row(x(0),x(1)))//1.3 拼接表头(schema)和行内容(rows)val df = spark.createDataFrame(rows,schema)//2.通过DataFrame插入数据到MySQL//如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表,需要注意表不能存在//df.write.mode("append"),是以追加的方式将数据写入到已经存在的表中
    df.write
      .format("jdbc").option("url","jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8")//指定连接.option("driver","com.mysql.cj.jdbc.Driver")//指定驱动.option("user","root")//指定连接的用户.option("password","123456")//指定连接的用户的密码.option("dbtable","t_user2")//查询的表.save()//保存数据}}
标签: 数据库 spark mysql

本文转载自: https://blog.csdn.net/qq_42260493/article/details/134975739
版权归原作者 叶子上的考拉 所有, 如有侵权,请联系我们删除。

“Spark读写MySQL数据库”的评论:

还没有评论