【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive
文章目录
一、mysql全量导入hive[分区表]
需求介绍:
本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。
- mysql表建表语句:
createtable t_order(
id intprimarykeyauto_increment,
amt decimal(10,2),`status`intdefault0,
user_id int,
create_time timestampDEFAULTCURRENT_TIMESTAMP,
modify_time timestampDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP)
- hive
createtable t_order(
id int,
amt decimal(10,2),`status`int,
user_id int,
create_time date,
modify_time date)partitioned by(dt string)row format delimited
fieldsterminatedby'\t'
注意字段时间戳,我们将从以上MySQL向Hive导入数据。
- 编写datax的json脚本
{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"connection":[{"jdbcUrl":["jdbc:mysql://hadoop10:3306/spark-dw"],"querySql":["select id,amt,status,user_id,create_time,modify_time from t_order"]}],"password":"0000","username":"root",}},"writer":{"name":"hdfswriter","parameter":{"column":[{"name":"id","type":"int"},{"name":"amt","type":"double"},{"name":"status","type":"int"},{"name":"user_id","type":"int"},{"name":"create_time","type":"string"},{"name":"modify_time","type":"string"}],"defaultFS":"hdfs://hadoop10:8020","fieldDelimiter":"\t","fileName":"t_order","fileType":"text","path":"/user/hive/warehouse/test_hive.db/t_order/dt=$dt","writeMode":"append"}}}],"setting":{"speed":{"channel":"1"}}}}
- 执行导入操作
在mysql中添加测试数据 导入mysql中7-11的数据到hive下7-11分区
insertinto t_order(amt,user_id)values(100,1001)insertinto t_order values(null,100,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')insertinto t_order values(null,120,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
在hive下创建分区
altertable t_order addpartition(dt='2023-07-11')
运行dataX脚本
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-11" /opt/installs/datax/job/mysql2hive.json
此部分的操作是将先插入mysql的三条数据导入到hive。
在mysql中添加测试数据 导入mysql中7-12的数据到hive下7-12分区
insertinto t_order values(null,200,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');insertinto t_order values(null,220,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
在hive下创建分区
altertable t_order addpartition(dt='2023-07-12')
运行datax脚本
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-12" /opt/installs/datax/job/mysql2hive.json
此部分的操作是将先插入mysql的三条数据和本次插入mysql的数据都导入到hive。
根据查询结果可以看到,此时我们重复导入了第一部分的数据,这就是全量导入。
二、mysql增量导入hive
大方向
:事实表用增量[订单表] 维度表用全量[商品表]
绝大部分公司采用的方案:全量为主、增量为辅
要想采用增量导入还有一个问题是你的业务库表能够支持增量导入
1. 增量导入的第一种实现方法
根据 id主键,查询hive表中最大的id值,然后去mysql中查询大于上述id值的数据。
如果有些使用uuid的,则不能用id,这种方案不适用于对修改的数据进行同步。
2. 另一种方法是 时间字段
在表中增加一个modify_time字段,如果数据新增或者修改,可以根据这个字段查询数据抽取到hive
3. dataX脚本
{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"connection":[{"jdbcUrl":["jdbc:mysql://hadoop10:3306/spark-dw"],"querySql":["select id,amt,status,user_id,create_time,modify_time from t_order where date_format(modify_time,'%Y-%m-%d') = '$dt'"]}],"password":"0000","username":"root",}},"writer":{"name":"hdfswriter","parameter":{"column":[{"name":"id","type":"int"},{"name":"amt","type":"double"},{"name":"status","type":"int"},{"name":"user_id","type":"int"},{"name":"create_time","type":"string"},{"name":"modify_time","type":"string"}],"defaultFS":"hdfs://hadoop10:8020","fieldDelimiter":"\t","fileName":"t_order","fileType":"text","path":"/user/hive/warehouse/test_hive.db/t_order/dt=$dt","writeMode":"append"}}}],"setting":{"speed":{"channel":"1"}}}}
运行该增量脚本,即可按照分区的日期,每次导入需要的mysql数据到hive。
三、利用Python自动生成Datax的json脚本
1. 创建mysql和hive数据库
createtable t_student(
id intPRIMARYkey,
name varchar(50),`age`int);createtable t_person(
id intPRIMARYkey,
name varchar(50),
parentid int);INSERTinto t_student values(1,'zhanmusi',15),(2,'lisi',55),(3,'lisi',66);INSERTinto t_person values(1,'miky',06),(2,'tom',16),(3,'jakcon',26);
createtable ods_t_student(
id int,
name string,`age`int)partitioned by(dt string)row format delimited
fieldsterminatedby'\t'createtable ods_t_person(
id int,
name string,
parentid int)partitioned by(dt string)row format delimited
fieldsterminatedby'\t'
2. 修改python脚本里面的密码(2处)和hdfs端口
import json
import sys
import pymysql
defgen_json(dbname, tablename):
s1 ={"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"connection":[{"jdbcUrl":["jdbc:mysql://hadoop10:3306/"+ dbname +"?useSSL=false"],"table":[tablename]}],"password":"0000",# 密码"username":"root","column": getColumn(dbname, tablename)}},"writer":{"name":"hdfswriter","parameter":{"column": getColumnAndType(dbname, tablename),"defaultFS":"hdfs://hadoop10:8020",# hdfs端口"fileType":"text","path":"/user/hive/warehouse/ods_"+ tablename +"/dt=$dt","fieldDelimiter":"\t","fileName": tablename,"writeMode":"append"}}}],"setting":{"speed":{"channel":"1"}}}}withopen('d:/test/'+ tablename +'.json','w')as f:
json.dump(s1, f)defqueryDataBase(dbname, tablename):
conn = pymysql.connect(user='root', password='0000', host='hadoop10')# 密码
cursor = conn.cursor()
cursor.execute("select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = %s and table_name = %s order by ordinal_position",[dbname, tablename])
fetchall = cursor.fetchall()
cursor.close()
conn.close()return fetchall
defgetColumn(dbname, tablename):
k1 = queryDataBase(dbname, tablename)
k2 =list(map(lambda x: x[0], k1))return k2
defgetColumnAndType(dbname, tablename):
k1 = queryDataBase(dbname, tablename)
mappings ={'bigint':'bigint','varchar':'string','int':'int','datetime':'string','text':'string'}
k2 =list(map(lambda x:{"name": x[0],"type": mappings[x[1].lower()]}, k1))return k2
if __name__ =='__main__':
l = sys.argv[1:]
dbname = l[0]# mysql数据库名
tablename = l[1]# 表名
gen_json(dbname, tablename)
3. 运行python脚本
(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_student
(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_person
4. 将生成的json文件上传到linux
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oB30wKR6-1689299346463)(上课笔记-day13.assets\1689068747698.png)]
5. 编写shell脚本 b.sh
#! /bin/bashdt=$1if[''$1=='']thendt=$(date-d yesterday +%Y-%m-%d)fiecho$dts=$(hive -e"show partitions ods_t_student partition(dt='$dt')")echo===$s====if["$s"=="partition"]then
hive -e"alter table ods_t_student add partition(dt='$dt')"elseecho"$dt分区已经存在"fi
python /opt/installs/datax/bin/datax.py -p"-Ddt=$dt" /opt/installs/datax/job/t_student.json
s=$(hive -e"show partitions ods_t_person partition(dt='$dt')")echo===$s====if["$s"=="partition"]then
hive -e"alter table ods_t_person add partition(dt='$dt')"elseecho"$dt分区已经存在"fi
python /opt/installs/datax/bin/datax.py -p"-Ddt=$dt" /opt/installs/datax/job/t_person.json
6. 运行shell
root@hadoop10 app]# sh b.sh 2023-07-13
任务启动时刻 : 2023-07-13 02:31:38
任务结束时刻 : 2023-07-13 02:31:50
任务总计耗时 : 12s
任务平均流量 : 2B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0
- hive
id|name |age|dt |
--|--------|---|----------|
1|zhanmusi| 15|2023-07-13|
2|lisi | 55|2023-07-13|
3|lisi | 66|2023-07-13|
版权归原作者 程序终结者 所有, 如有侵权,请联系我们删除。