0


【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

文章目录

一、mysql全量导入hive[分区表]

需求介绍:

本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。


  • mysql表建表语句:
createtable t_order(
    id            intprimarykeyauto_increment,
    amt           decimal(10,2),`status`intdefault0,
    user_id      int,
    create_time timestampDEFAULTCURRENT_TIMESTAMP,
    modify_time timestampDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP)
  • hive
createtable t_order(
    id            int,
    amt           decimal(10,2),`status`int,
    user_id      int,
    create_time date,
    modify_time date)partitioned by(dt string)row format delimited 
fieldsterminatedby'\t'

注意字段时间戳,我们将从以上MySQL向Hive导入数据。

  • 编写datax的json脚本
{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"connection":[{"jdbcUrl":["jdbc:mysql://hadoop10:3306/spark-dw"],"querySql":["select id,amt,status,user_id,create_time,modify_time from t_order"]}],"password":"0000","username":"root",}},"writer":{"name":"hdfswriter","parameter":{"column":[{"name":"id","type":"int"},{"name":"amt","type":"double"},{"name":"status","type":"int"},{"name":"user_id","type":"int"},{"name":"create_time","type":"string"},{"name":"modify_time","type":"string"}],"defaultFS":"hdfs://hadoop10:8020","fieldDelimiter":"\t","fileName":"t_order","fileType":"text","path":"/user/hive/warehouse/test_hive.db/t_order/dt=$dt","writeMode":"append"}}}],"setting":{"speed":{"channel":"1"}}}}
  • 执行导入操作

在mysql中添加测试数据 导入mysql中7-11的数据到hive下7-11分区

insertinto t_order(amt,user_id)values(100,1001)insertinto t_order values(null,100,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')insertinto t_order values(null,120,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')

在hive下创建分区

altertable t_order addpartition(dt='2023-07-11')

运行dataX脚本

python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-11" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据导入到hive。


在mysql中添加测试数据 导入mysql中7-12的数据到hive下7-12分区

insertinto t_order values(null,200,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');insertinto t_order values(null,220,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');

在hive下创建分区

altertable t_order addpartition(dt='2023-07-12')

运行datax脚本

python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-12" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据和本次插入mysql的数据都导入到hive。
根据查询结果可以看到,此时我们重复导入了第一部分的数据,这就是全量导入。

二、mysql增量导入hive

大方向

:事实表用增量[订单表] 维度表用全量[商品表]

绝大部分公司采用的方案:全量为主、增量为辅

要想采用增量导入还有一个问题是你的业务库表能够支持增量导入

1. 增量导入的第一种实现方法

根据 id主键,查询hive表中最大的id值,然后去mysql中查询大于上述id值的数据。
如果有些使用uuid的,则不能用id,这种方案不适用于对修改的数据进行同步。

2. 另一种方法是 时间字段

在表中增加一个modify_time字段,如果数据新增或者修改,可以根据这个字段查询数据抽取到hive

3. dataX脚本

{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"connection":[{"jdbcUrl":["jdbc:mysql://hadoop10:3306/spark-dw"],"querySql":["select id,amt,status,user_id,create_time,modify_time from t_order where date_format(modify_time,'%Y-%m-%d') = '$dt'"]}],"password":"0000","username":"root",}},"writer":{"name":"hdfswriter","parameter":{"column":[{"name":"id","type":"int"},{"name":"amt","type":"double"},{"name":"status","type":"int"},{"name":"user_id","type":"int"},{"name":"create_time","type":"string"},{"name":"modify_time","type":"string"}],"defaultFS":"hdfs://hadoop10:8020","fieldDelimiter":"\t","fileName":"t_order","fileType":"text","path":"/user/hive/warehouse/test_hive.db/t_order/dt=$dt","writeMode":"append"}}}],"setting":{"speed":{"channel":"1"}}}}

运行该增量脚本,即可按照分区的日期,每次导入需要的mysql数据到hive。

三、利用Python自动生成Datax的json脚本

1. 创建mysql和hive数据库

createtable t_student(
    id        intPRIMARYkey,
    name           varchar(50),`age`int);createtable t_person(
    id                intPRIMARYkey,
    name           varchar(50),
    parentid         int);INSERTinto t_student values(1,'zhanmusi',15),(2,'lisi',55),(3,'lisi',66);INSERTinto t_person values(1,'miky',06),(2,'tom',16),(3,'jakcon',26);
createtable ods_t_student(
    id            int,
    name           string,`age`int)partitioned by(dt string)row format delimited 
fieldsterminatedby'\t'createtable ods_t_person(
    id                int,
    name               string,
    parentid         int)partitioned by(dt string)row format delimited 
fieldsterminatedby'\t'

2. 修改python脚本里面的密码(2处)和hdfs端口

import json
import sys
import pymysql

defgen_json(dbname, tablename):
 s1 ={"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"connection":[{"jdbcUrl":["jdbc:mysql://hadoop10:3306/"+ dbname +"?useSSL=false"],"table":[tablename]}],"password":"0000",# 密码"username":"root","column": getColumn(dbname, tablename)}},"writer":{"name":"hdfswriter","parameter":{"column": getColumnAndType(dbname, tablename),"defaultFS":"hdfs://hadoop10:8020",# hdfs端口"fileType":"text","path":"/user/hive/warehouse/ods_"+ tablename +"/dt=$dt","fieldDelimiter":"\t","fileName": tablename,"writeMode":"append"}}}],"setting":{"speed":{"channel":"1"}}}}withopen('d:/test/'+ tablename +'.json','w')as f:
     json.dump(s1, f)defqueryDataBase(dbname, tablename):
 conn = pymysql.connect(user='root', password='0000', host='hadoop10')# 密码
 cursor = conn.cursor()
 cursor.execute("select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = %s and table_name = %s order by ordinal_position",[dbname, tablename])
 fetchall = cursor.fetchall()
 cursor.close()
 conn.close()return fetchall

defgetColumn(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 k2 =list(map(lambda x: x[0], k1))return k2

defgetColumnAndType(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 mappings ={'bigint':'bigint','varchar':'string','int':'int','datetime':'string','text':'string'}
 k2 =list(map(lambda x:{"name": x[0],"type": mappings[x[1].lower()]}, k1))return k2

if __name__ =='__main__':
 l = sys.argv[1:]
 dbname = l[0]# mysql数据库名
 tablename = l[1]# 表名
 gen_json(dbname, tablename)

3. 运行python脚本

(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_student

(untitled0606) C:\Users\Lenovo\PycharmProjects\untitled0606>python .\test0606\test_gen.py spark-dw t_person

4. 将生成的json文件上传到linux

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oB30wKR6-1689299346463)(上课笔记-day13.assets\1689068747698.png)]

5. 编写shell脚本 b.sh

#! /bin/bashdt=$1if[''$1=='']thendt=$(date-d yesterday +%Y-%m-%d)fiecho$dts=$(hive -e"show partitions ods_t_student partition(dt='$dt')")echo===$s====if["$s"=="partition"]then
 hive -e"alter table ods_t_student add partition(dt='$dt')"elseecho"$dt分区已经存在"fi

python /opt/installs/datax/bin/datax.py -p"-Ddt=$dt" /opt/installs/datax/job/t_student.json

s=$(hive -e"show partitions ods_t_person partition(dt='$dt')")echo===$s====if["$s"=="partition"]then
 hive -e"alter table ods_t_person add partition(dt='$dt')"elseecho"$dt分区已经存在"fi

python /opt/installs/datax/bin/datax.py -p"-Ddt=$dt" /opt/installs/datax/job/t_person.json

6. 运行shell

root@hadoop10 app]# sh b.sh 2023-07-13
任务启动时刻                    : 2023-07-13 02:31:38
任务结束时刻                    : 2023-07-13 02:31:50
任务总计耗时                    :                 12s
任务平均流量                    :                2B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0
  • hive
id|name    |age|dt        |
--|--------|---|----------|
 1|zhanmusi| 15|2023-07-13|
 2|lisi    | 55|2023-07-13|
 3|lisi    | 66|2023-07-13|
标签: spark mysql hive

本文转载自: https://blog.csdn.net/qq_31412425/article/details/131716906
版权归原作者 程序终结者 所有, 如有侵权,请联系我们删除。

“【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive”的评论:

还没有评论