0


Spark——JDBC操作MySQL

文章目录

JDBC操作MySQL

在实际的企业级开发环境中,如果数据规模特S别大,此时采用传统的SQL语句去处理的话一般需要分成很多批次处理,而且很容易造成数据库服务宕机,且实际的处理过程可能会非常复杂,通过传统的Java EE等技术可能很难或者不方便实现处理算法,此时采用SparkSQL进行分布式分析处理就可以非常好的解决该问题,在生产环境下,一般会在Spark SQL和具体要操作的DB之间加上一个缓冲层次,例如中间使用Redis或者Kafka。

Spark SQL可以通过JDBC从传统的关系型数据库中读写数据,读取数据后直接生成的是DataFrame,然后再加上借助于Spark SQL丰富的API来进行各种操作。从计算数据规模的角度去讲,集群并行访问数据库数据,调用Data Frame Reader的Format(“JDBC”)的方式说明Spark SQL操作的数据来源是通过JDBC获得,JDBC后端一般都是数据库,例如MySQL、Oracle等。

JDBC读取数据方式

  1. 单Partition(无并发)调用函数格式:def jdbc(url: String, table: String, properties: Properties): DataFrame- url:代表数据库的JDBC链接地址;- table:具体要链接的数据库;这种方法是将所有的数据放在一个Partition中进行操作(即并发度为1),意味着无论给的资源有多少,只有一个Task会执行任务,执行效率比较慢,并且容易出现OOM。使用如下,在spark-shell中执行:/*此为代码格式,实际中使用应替换相应字段中的内容*/val url ="jdbc:mysql://localhost:/database"val tableName ="table"// 设置连接用户&密码val prop =new java.util.Propertiesprop.setProperty("user","username")//实际使用中替换username为相应的用户名prop.setProperty("password","pwd")//实际使用中替换pwd为相应的密码
  2. 根据Long类型字段分区/*此为代码格式,实际中使用应替换相应字段中的内容*/def jdbc(url:String,table:String,columnName:String,// 根据该字段分区,需要为整型,比如 id 等lowerBound:Long,// 分区的下界upperBound:Long,// 分区的上界numPartitions:Int,//分区的个数connectionProperties: Properties): DataFrame根据字段将数据进行分区,放进不同的Partition中,执行效率较快,但是只能根据数据字段作为分区关键字。使用如下:/*此为代码格式,实际中使用应替换相应字段中的内容*/val url ="jdbc:mysql://mysqlHost:3306/database"val tableName ="table"val columnName ="colName"val lowerBound =1,val upperBound =10000000,val numPartitions =10,// 设置连接用户&密码val prop =new java.util.Propertiesprop.setProperty("user","username")prop.setProperty("password","pwd")将字段 colName 中发 1~10000000 条数据分区到 10 个 Partition 中。
  3. 根据任意类型字段分区/*此为代码格式,实际中使用应替换相应字段中的内容*/jdbc(url:String,table:String,predicates: Array[String],connectionProperties: Properties): DataFrame以下使用时间字段进行分区:/*此为代码格式,实际中使用应替换相应字段中的内容*/val url ="jdbc:mysql://mysqlHost:3306/database"val tableName ="table"// 设置连接用户&密码val prop =new java.util.Propertiesprop.setProperty("user","username")prop.setProperty("password","pwd")/*** 将 9 月 16-12 月 15 三个月的数据取出,按时间分为 6 个 partition* 为了减少事例代码,这里的时间都是写死的* modified_time 为时间字段*/val predicates =Array("2015-09-16"->"2015-09-30","2015-10-01"->"2015-10-15","2015-10-16"->"2015-10-31","2015-11-01"->"2015-11-14","2015-11-15"->"2015-11-30","2015-12-01"->"2015-12-15").map {case(start, end)=>s"cast(modified_time as date) >= date '$start' "+ s"AND cast(modified_timeas date)<= date '$end'"}这种方法可以使用任意字段进行分区,比较灵活,适用于各种场景。以MySQL 3000W数据量为例,如果单分区count,若干分钟就会报OOM;如果分成5~20个分区后,count操作只需要2s,效率会明显提高,这里就凸显出JDBC高并发的优势。Spark高并发度可以大幅度提高读取以及处理数据的速度,但是如果设置过高(大量的Partition同时读取)也可能会将数据源数据库宕掉。

JDBC读取MySQL数据

下面来进行实际操作,首先需要配置MySQL

  • 免密登陆:mysql -uroot
  • 查看数据库:show databases;
  • 使用MySQL数据库:use mysql;

修改表格的权限,目的是为了使其他主机可以远程连接 MySQL,通过此命令可以查看访问用户允许的主机名。

  • 查看所有用户及其host:select host, user from user;
  • 将相应用户数据表中的host字段改成’%':update user set host="%" where user="root";
  • 刷新修改权限flush privileges;

通过命令修改host为%,表示任意IP地址都可以登录。出现

ERROR 1062 (23000): Duplicate entry '%-root' for key 'PRIMARY'

,是因为 user+host 是主键,不能重复,可以不用理会。也可通过以下命令删除user 为空的内容来解决:

delete from user where user='';

在MySQL创建数据库和表格,插入数据,查看

create database test;//创建数据库test
use test;//进入数据库test
create table people( name varchar(12), age int);//创建表格people并构建结构
insert into people values ("Andy",30),("Justin",19),("Dela",25),("Magi",20),("Pule",21),("Mike",12);//向people表中插入数据
select * from people;//输出people表中全部数据

编写代码读取MySQL表中数据

//导入依赖环境importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.sql.{DataFrame, SQLContext}importjava.util.Properties
val url ="jdbc:mysql://localhost/test"//MySQL地址及数据库val username ="root"//用户名val sqlContext =new SQLContext(sc)       
sc.setLogLevel("WARN")val uri = url +"?user="+ username +"&useUnicode=true&characterEncoding=UTF-8"//设置读取路径及用户名val properties =new Properties()//创建JDBC连接信息
properties.put("user","root")
properties.put("driver","com.mysql.jdbc.Driver")val df_test: DataFrame = spark.sqlContext.read.jdbc(uri,"people", properties)//读取数据
df_test.select("name","age").collect().foreach(row =>{//输出数据
    println("name "+ row(0)+", age"+ row(1))})
df_test.write.mode("append").jdbc(uri,"people",properties)//向people表中写入读出的数据,相当于people表中有两份一样的数据
标签: spark scala 大数据

本文转载自: https://blog.csdn.net/weixin_44018458/article/details/128800342
版权归原作者 码上行舟 所有, 如有侵权,请联系我们删除。

“Spark——JDBC操作MySQL”的评论:

还没有评论