0


学习大数据DAY59 全量抽取和增量抽取实战

**需求流程: **

全量抽取 增量抽取 - DataX Kettle Sqoop ...

*场景: 业务部门同事或者甲方的工作人员给我们的部门经理和你提出了新的需***求 **

流程: 联系 => 开会讨论 => 确认需求 => 落地

**需求文档(**具体需要的东西)

原型文档(报表的原型 纸笔/画图工具)

第一张汇总报表需要的指标 - 决策报表 汇总表 每次计算只有一天的记录 -

大 BOSS:

  • 全部会员数 新增会员数

  • 有效会员数 有效会员占比

  • 流失会员数: 倒推一年含一年无消费记录的会员

  • 净增有效会员数

  • 会员消费级别分类人数 (A >=2000 B >=1000 < 2000 C >=500 <1000 D >=100

<500 E <100)

  • 会员消费总额

  • 会员消费总次数

  • 会员消费单价

  • 流失会员数

  • 新增流失会员

  • 60 天会员复购率

  • 180 天会员复购率

  • 365 天会员复购率

第二张报表用于市场营销 - 明细报表, 普通报表 - 市场部同事

  • 筛选大于 30 笔的会员或者消费总金额大于 1500 的会员作为目标用户 用于电

话营销

  • 字段: 姓名 手机号 消费总额 消费次数 城市 门店 付款偏好 (手机 刷卡

现金..) 关注的疾病

  • 该会员最近 3 个月的月消费订单数和额度 m1_total m1_sale m2_total

m2_sale

第三张报表用于市场营销 - 2022.1-2023.12 每个月消费前 20 的会员名单

24X20=480 条 - 市场部经理

  • T+1(月) yyyy-mm 月份

  • 会员姓名 联系方式... 消费级别分类, 最近 30 天消费订单数和总额

  • 该会员当月前 30 天消费, 60 天消费, 90 天消费 (困难点)

  • 报表排序方式: 默认按消费总额倒序 / 按消费次数倒序

  • 报表默认显示 2021 年 1 月份的数据, 可选 2 年内任何一个月的数据查看

**需求分析与规范 **

经理整理出一个类似宽表文档的东西 - 方便后续的明细查询和指标计算

**它决定了我们需要抽取哪些表 **

crm.user_base_info_his 客户信息表

erp.u_memcard_reg 会员卡信息表

erp.u_sale_m 订单表

1900W 数据

erp.u_sale_pay 订单支付表

1200Werp.c_memcard_class_group 会员分组表

erp.u_memcard_reg_c 疾病关注表

his.chronic_patient_info_new 检测表

erp.c_org_busi 门店表

额外的从文件处理的码值表

erp.c_code_value

7 个全量

系统名前缀_表名_(full|inc)

crm.user_base_info_his

全量 => ods_lijinquan.crm_user_base_info_his_full

erp.u_memcard_reg

全量=> ods_lijinquan.erp_u_memcard_reg_full

erp.c_memcard_class_group

全量 => ods_lijinquan.erp_c_memcard_class_group_full

erp.u_memcard_reg_c

全量=>ods_lijinquan.erp_u_memcard_reg_c_full

his.chronic_patient_info_new

全量 => ods_lijinquan.his_chronic_patient_info_new_full

erp.c_org_busi 全量 => ods_lijinquan.erp_c_org_busi_full

erp.c_code_value

全量文件处理

=>ods_lijinquan.c_code_value_full

增量

erp.u_sale_m 先 做 全 量 ( 一 次 性 ) 再 做 增 量 ( 每 天 执 行 )

=> ods_lijinquan.erp_u_sale_m_inc

erp.u_sale_pay 同上 增量 => ods_lijinquan.erp_u_sale_pay_inc

**作业 **

完成 7 张全量表的抽取, 部署到调度平台 7 个调度任务

所有表最后都要跟源表的总数进行对比 需要一致

**全量表处理 **

**升级辅助脚本 **

  • 自动读取表的字段信息, 自动生成 datax json 文件 和 Hive

建表文件

full.py:

#!/bin/python3
import pymysql
import sys
# 自动写 datax 的 json 文件
if len(sys.argv)!=3:
print("使用方法为:python3 full.py 数据库名 表名")
sys.exit()
sys_name=sys.argv[1]
table_name=sys.argv[2]
# datax_json=f"{sys_name}.{table_name}_full.json"
db=pymysql.connect(
host='zhiyun.pub',
port=23306,
user='zhiyun',
password='zhiyun',
database='information_schema'
)
cursor=db.cursor()
cursor.execute(f"select column_name,data_type from
information_schema.columns where table_schema='{sys_name}' and
table_name='{table_name}'")
data=cursor.fetchall()
fileds=[]for field in data:
field_name = field[0]
field_type = field[1]
#转换成 hive 类型
field_hive_type="string"
if field_type=="int" or field_type=="tinyint" or
field_type=="bigint":
field_hive_type="int"
if field_type=="float" or field_type=="double":
field_hive_type="float"
fileds.append([field_name,field_hive_type])
db.close()
print("=============== 配置 datax ===============")
file_path=f"/zhiyun/shihaihong/jobs/{sys_name}_{table_name}_fu
ll.json"
template_path="/zhiyun/shihaihong/jobs/template.json"
with open(template_path,"r",encoding="utf-8") as f:
template_content=f.read()
new_content=template_content.replace("#sys_name#",sys_name)
new_content=new_content.replace("#table_name#",table_name)
#列的替换
lines=[]
for filed in fileds:
line='
{"name":"'+filed[0]+'
","type":"'+filed[1]+'"},'
lines.append(line)
columns="\n".join(lines)
columns=columns.strip(",")
new_content=new_content.replace("\"#columns#\"",columns)
#写入到新的配置
with open(file_path,"w",encoding="utf-8") as ff:
ff.write(new_content)
ff.close()
f.close()
print("datax 文件配置成功")
print("=============== 配置 hive ===============")file_path=f"/zhiyun/shihaihong/sql/{sys_name}_{table_name}_ful
l.sql"
template_path="/zhiyun/shihaihong/sql/template.sql"
with open(template_path,"r",encoding="utf-8") as f:
template_content=f.read()
new_content=template_content.replace("#sys_name#",sys_name)
new_content=new_content.replace("#table_name#",table_name)
#列的替换
lines=[]
for filed in fileds:
line=f"
{filed[0]} {filed[1]},"
lines.append(line)
columns="\n".join(lines)
columns=columns.strip(",")
new_content=new_content.replace("#columns#",columns)
#写入到新的配置
with open(file_path,"w",encoding="utf-8") as ff:
ff.write(new_content)
ff.close()
print("hive 建表文件生成成功")

json 模板:

template.json:

{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://zhiyun.pub:233
06/crm?useSSL=false"
],
"table": [
"#table_name#"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
"#column#"
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"#sys_name#_#table_name#_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}

sql 模板文件:

create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";-- 增量表
create external table if not exists
ods_shihaihong.#table_name#_full(
#columns#
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/#sys_name#_#table_name#_full";

测试:

python3 python/full.py crm user_base_info_his

生成 json 和 sql 文件:

根据 json 和 sql 文件写出获得数据的 shell 脚本:

crm_user_base_info_his_full.sh:

#!/bin/bash
echo "生成全量配置文件"mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/crm?useSSL=false"
],
"table": [
"user_base_info_his"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"user_id","type":"string"},
{"name":"user_type","type":"string"}
,
{"name":"source","type":"string"},
{"name":"erp_code","type":"string"}
,
{"name":"active_time","type":"strin
g"},
{"name":"name","type":"string"},
{"name":"sex","type":"string"},
{"name":"education","type":"string"}
,{"name":"job","type":"string"},
{"name":"email","type":"string"},
{"name":"wechat","type":"string"},
{"name":"webo","type":"string"},
{"name":"birthday","type":"string"}
,
{"name":"age","type":"int"},
{"name":"id_card_no","type":"string
"},
{"name":"social_insurance_no","type
":"string"},
{"name":"address","type":"string"},
{"name":"last_subscribe_time","type
":"int"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"crm_user_base_info_his_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/crm_user_base_info_his_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/crm_user_base_info_his_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/crm_user_base_info_his_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.user_base_info_his_full(
id int,
user_id string,
user_type string,
source string,
erp_code string,
active_time string,
name string,
sex string,
education string,
job string,
email string,
wechat string,
webo string,
birthday string,
age int,
id_card_no string,
social_insurance_no string,
address string,
last_subscribe_time int
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/crm_user_base_info_his_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/crm_user_base_info_his_full/*\"
overwrite into table ods_shihaihong.crm_user_base_info_his_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.user_base_info_his_full;
"echo "抽取完成"

运行测试:

在本人数据库检查:

在生产调度中心设置任务:

在 GLUE IDE 插入 sh 文件后,执行一次:

运行成功。

其它六张表用同样的方式操作即可。

其它六张表的 shell 脚本: erp_u_memcard_reg_full.sh:

#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_memcard_reg"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"memcardno","type":"string"}
,
{"name":"busno","type":"string"},
{"name":"introducer","type":"string
"},
{"name":"cardtype","type":"int"},
{"name":"cardlevel","type":"int"},{"name":"cardpass","type":"string"}
,
{"name":"cardstatus","type":"int"},
{"name":"saleamount","type":"string
"},
{"name":"realamount","type":"string
"},
{"name":"puramount","type":"string"}
,
{"name":"integral","type":"string"}
,
{"name":"integrala","type":"string"}
,
{"name":"integralflag","type":"int"}
,
{"name":"cardholder","type":"string
"},
{"name":"cardaddress","type":"strin
g"},
{"name":"sex","type":"string"},
{"name":"tel","type":"string"},
{"name":"handset","type":"string"},
{"name":"fax","type":"string"},
{"name":"createuser","type":"string
"},
{"name":"createtime","type":"string
"},
{"name":"tstatus","type":"int"},
{"name":"notes","type":"string"},
{"name":"stamp","type":"string"},
{"name":"idcard","type":"string"},
{"name":"birthday","type":"string"}
,
{"name":"allowintegral","type":"int
"},
{"name":"apptype","type":"string"},
{"name":"applytime","type":"string"}
,
{"name":"invalidate","type":"string
"},
{"name":"lastdate","type":"string"}
,
{"name":"bak1","type":"string"},{"name":"scrm_userid","type":"strin
g"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_u_memcard_reg_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_memcard_reg_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_memcard_reg_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_memcard_reg_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.u_memcard_reg_full(
id int,
memcardno string,
busno string,
introducer string,
cardtype int,
cardlevel int,
cardpass string,
cardstatus int,saleamount string,
realamount string,
puramount string,
integral string,
integrala string,
integralflag int,
cardholder string,
cardaddress string,
sex string,
tel string,
handset string,
fax string,
createuser string,
createtime string,
tstatus int,
notes string,
stamp string,
idcard string,
birthday string,
allowintegral int,
apptype string,
applytime string,
invalidate string,
lastdate string,
bak1 string,
scrm_userid string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_u_memcard_reg_full/*\" overwrite
into table ods_shihaihong.erp_u_memcard_reg_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "select count(1) from ods_shihaihong.u_memcard_reg_full;
"
echo "抽取完成"

erp.c_memcard_class_group:

#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"c_memcard_class_group"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"createtime","type":"string"},{"name":"createuser","type":"string
"},
{"name":"groupid","type":"int"},
{"name":"groupname","type":"string"}
,
{"name":"notes","type":"string"},
{"name":"stamp","type":"int"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_c_memcard_class_group_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_c_memcard_class_group_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.c_memcard_class_group_full(
createtime string,
createuser string,groupid int,
groupname string,
notes string,
stamp int
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location
"/zhiyun/shihaihong/ods/erp_c_memcard_class_group_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_c_memcard_class_group_full/*\"
overwrite into table
ods_shihaihong.erp_c_memcard_class_group_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_memcard_class_group_full;
"
echo "抽取完成"
erp.u_memcard_reg_c:
#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader","parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_memcard_reg_c"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"memcardno","type":"string"}
,
{"name":"sickness","type":"string"}
,
{"name":"status","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_u_memcard_reg_c_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_memcard_reg_c_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.u_memcard_reg_c_full(
id int,
memcardno string,
sickness string,
status string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_memcard_reg_c_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_u_memcard_reg_c_full/*\" overwrite
into table ods_shihaihong.erp_u_memcard_reg_c_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.u_memcard_reg_c_full;
"echo "抽取完成"

his.chronic_patient_info_new:

#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/his?useSSL=false"
],
"table": [
"chronic_patient_info_new"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},{"name":"member_id","type":"string"}
,
{"name":"erp_code","type":"string"}
,
{"name":"extend","type":"string"},
{"name":"detect_time","type":"strin
g"},
{"name":"bec_chr_mbr_date","type":"
string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"his_chronic_patient_info_new_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' >
/zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
echo "开始抽取"
hadoop fs -mkdir -p
/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/his_chronic_patient_info_new_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表create external table if not exists
ods_shihaihong.chronic_patient_info_new_full(
id int,
member_id string,
erp_code string,
extend string,
detect_time string,
bec_chr_mbr_date string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location
"/zhiyun/shihaihong/ods/his_chronic_patient_info_new_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/his_chronic_patient_info_new_full/*\"
overwrite into table
ods_shihaihong.his_chronic_patient_info_new_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from
ods_shihaihong.chronic_patient_info_new_full;
"
echo "抽取完成"

erp.c_org_busi :

#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobsecho '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"c_org_busi"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"busno","type":"string"},
{"name":"orgname","type":"string"},
{"name":"orgsubno","type":"string"}
,
{"name":"orgtype","type":"string"},
{"name":"salegroup","type":"string"}
,
{"name":"org_tran_code","type":"str
ing"},
{"name":"accno","type":"string"},
{"name":"sendtype","type":"string"}
,
{"name":"sendday","type":"string"},{"name":"maxday","type":"string"},
{"name":"minday","type":"string"},
{"name":"notes","type":"string"},
{"name":"stamp","type":"string"},
{"name":"status","type":"string"},
{"name":"customid","type":"string"}
,
{"name":"whl_vendorno","type":"stri
ng"},
{"name":"whlgroup","type":"string"}
,
{"name":"rate","type":"string"},
{"name":"creditamt","type":"string"}
,
{"name":"creditday","type":"string"}
,
{"name":"peoples","type":"string"},
{"name":"area","type":"string"},
{"name":"abc","type":"string"},
{"name":"address","type":"string"},
{"name":"tel","type":"string"},
{"name":"principal","type":"string"}
,
{"name":"identity_card","type":"str
ing"},
{"name":"mobil","type":"string"},
{"name":"corporation","type":"strin
g"},
{"name":"saler","type":"string"},
{"name":"createtime","type":"string
"},
{"name":"bank","type":"string"},
{"name":"bankno","type":"string"},
{"name":"bak1","type":"string"},
{"name":"bak2","type":"string"},
{"name":"a_bak1","type":"string"},
{"name":"aa_bak1","type":"string"},
{"name":"b_bak1","type":"string"},
{"name":"bb_bak1","type":"string"},
{"name":"y_bak1","type":"string"},
{"name":"t_bak1","type":"string"},
{"name":"ym_bak1","type":"string"},
{"name":"tm_bak1","type":"string"},{"name":"supervise_code","type":"st
ring"},
{"name":"monthrent","type":"string"}
,
{"name":"wms_warehid","type":"strin
g"},
{"name":"settlement_cycle","type":"
string"},
{"name":"apply_cycle","type":"strin
g"},
{"name":"applydate","type":"string"}
,
{"name":"accounttype","type":"strin
g"},
{"name":"applydate_last","type":"st
ring"},
{"name":"paymode","type":"string"},
{"name":"yaolian_flag","type":"stri
ng"},
{"name":"org_longitude","type":"str
ing"},
{"name":"org_latitude","type":"stri
ng"},
{"name":"org_province","type":"stri
ng"},
{"name":"org_city","type":"string"}
,
{"name":"org_area","type":"string"}
,
{"name":"business_time","type":"str
ing"},
{"name":"yaolian_group","type":"str
ing"},
{"name":"pacard_storeid","type":"st
ring"},
{"name":"opening_time","type":"stri
ng"},
{"name":"ret_ent_id","type":"string
"},
{"name":"ent_id","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t","fileName": "erp_c_org_busi_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_c_org_busi_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_org_busi_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_c_org_busi_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.c_org_busi_full(
id int,
busno string,
orgname string,
orgsubno string,
orgtype string,
salegroup string,
org_tran_code string,
accno string,
sendtype string,
sendday string,
maxday string,
minday string,
notes string,
stamp string,status string,
customid string,
whl_vendorno string,
whlgroup string,
rate string,
creditamt string,
creditday string,
peoples string,
area string,
abc string,
address string,
tel string,
principal string,
identity_card string,
mobil string,
corporation string,
saler string,
createtime string,
bank string,
bankno string,
bak1 string,
bak2 string,
a_bak1 string,
aa_bak1 string,
b_bak1 string,
bb_bak1 string,
y_bak1 string,
t_bak1 string,
ym_bak1 string,
tm_bak1 string,
supervise_code string,
monthrent string,
wms_warehid string,
settlement_cycle string,
apply_cycle string,
applydate string,
accounttype string,
applydate_last string,
paymode string,
yaolian_flag string,
org_longitude string,
org_latitude string,
org_province string,org_city string,
org_area string,
business_time string,
yaolian_group string,
pacard_storeid string,
opening_time string,
ret_ent_id string,
ent_id string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_c_org_busi_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_c_org_busi_full/*\" overwrite into
table ods_shihaihong.erp_c_org_busi_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_org_busi_full;
"
echo "抽取完成"

erp.c_code_value:

#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"c_code_value"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"cat_name","type":"string"}
,
{"name":"cat_code","type":"string"}
,
{"name":"val_name","type":"string"}
,
{"name":"var_desc","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName":
"erp_c_code_value_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_c_code_value_full",
"writeMode": "truncate"}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_c_code_value_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_c_code_value_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_c_code_value_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.c_code_value_full(
id int,
cat_name string,
cat_code string,
val_name string,
var_desc string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_c_code_value_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_c_code_value_full/*\" overwrite
into table ods_shihaihong.erp_c_code_value_full
partition(createtime='$day');
# "echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_code_value_full;
"
echo "抽取完成"

任务调度:

erp.u_memcard_reg:

erp.c_memcard_class_group:

erp.u_memcard_reg_c:

his.chronic_patient_info_new:

erp.c_org_busi :

erp.c_code_value :

将码值表文件上传到 data 中,用 python 写一个数据清洗的脚本:

#!/bin/python3
import os
import pandas as pd
from openpyxl import load_workbook
ss=''
lst=[]#使用 pandas 的 read_excel 函数读取指定路径的 Excel 文件。
sheet_name=None 表示读取文件中的所有工作表,而 header=2 表示数据的表
头位于第 3 行(索引从 0 开始)
dfs = pd.read_excel('/zhiyun/shihaihong/data/12.码值
表.xlsx',sheet_name=None,header=2)
dir=list(dfs.keys())
#获取 xlsx 文件数据
for i in range(len(dir)):
if i>1:
#获取 A2 行数据
wb = load_workbook(filename='/zhiyun/shihaihong/data/12.
码值表.xlsx')
str_head = wb[dir[i]]['A2'].value
data=dfs[dir[i]]
#获取其它行数据
lst1=[]
for i in data.columns:
for j in range(len(data)):
if data[i][j] != 'NaN':
lst1.append(str(data[i][j]))
n=int(len(lst1)/2)
for i in range(n):
ss=f"{str_head.split('-')[0]}|{str_head.split('-')[
1]}|{lst1[i]}|{lst1[i+n]}"
lst.append(ss)
print("写入数据到 data")
template_path = "/zhiyun/shihaihong/data/code_value.txt"
with open(template_path,"w",encoding="utf-8") as f:
content="\n".join(lst)
f.write(content)
f.close
print("上传 data 文件 到 hdfs")
os.system(f"hdfs dfs -mkdir -p /zhiyun/shihaihong/filetxt/")
os.system(f"hdfs dfs -put {template_path}
/zhiyun/shihaihong/filetxt/")
#!/bin/bash
# 作用: 完成从编写配置文件到验证数据的整个过程
# 需要在任何节点都可以执行
# 创建本人文件夹
mkdir -p /zhiyun/shihaihong/data /zhiyun/shihaihong/jobs
/zhiyun/shihaihong/python /zhiyun/shihaihong/shell
/zhiyun/shihaihong/sql
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e'
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists
ods_shihaihong.c_code_value_full(
cat_name string,
cat_code string,
val_name string,
var_desc string
)
row format delimited fields terminated by "|"
lines terminated by "\n"
stored as textfile
location "/zhiyun/shihaihong/filetxt";'
echo "hive 建表完成"
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.c_code_value_full;
"
echo "验证完成"

执行后,用 shell 脚本抽取

任务调度:

作业2

完成 2 张增量表的处理 历史数据调度任务 + 增量调度任务 4 个调度任务

所有表最后都要跟源表的总数进行对比 需要一致

抽取的增量表:

erp.u_sale_m 先 做 全 量 ( 一 次 性 ) 再 做 增 量 ( 每 天 执 行 ) =>

ods_lijinquan.erp_u_sale_m_inc

erp.u_sale_pay 同上 增量 => ods_lijinquan.erp_u_sale_pay_inc

首次抽取为全量抽取:

erp_u_sale_m_full.sh:

#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_sale_m"]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"busno","type":"string"},
{"name":"posno","type":"string"},
{"name":"extno","type":"string"},
{"name":"extsource","type":"string"}
,
{"name":"o2o_trade_from","type":"st
ring"},
{"name":"channel","type":"int"},
{"name":"starttime","type":"string"}
,
{"name":"finaltime","type":"string"}
,
{"name":"payee","type":"string"},
{"name":"discounter","type":"string
"},
{"name":"crediter","type":"string"}
,
{"name":"returner","type":"string"}
,
{"name":"warranter1","type":"string
"},
{"name":"warranter2","type":"string
"},
{"name":"stdsum","type":"string"},
{"name":"netsum","type":"string"},
{"name":"loss","type":"string"},
{"name":"discount","type":"float"},
{"name":"member","type":"string"},
{"name":"precash","type":"string"},
{"name":"stamp","type":"string"},{"name":"shiftid","type":"string"},
{"name":"shiftdate","type":"string"}
,
{"name":"yb_saleno","type":"string"
}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_m_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_sale_m_full",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_m_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_m_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists ods_shihaihong.u_sale_m_full(
id int,
saleno string,
busno string,
posno string,
extno string,
extsource string,
o2o_trade_from string,channel int,
starttime string,
finaltime string,
payee string,
discounter string,
crediter string,
returner string,
warranter1 string,
warranter2 string,
stdsum string,
netsum string,
loss string,
discount float,
member string,
precash string,
stamp string,
shiftid string,
shiftdate string,
yb_saleno string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_m_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath \"/zhiyun/shihaihong/tmp/erp_u_sale_m_full/*\"
overwrite into table ods_shihaihong.erp_u_sale_m_full
partition(createtime='$day');
# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.u_sale_m_full;
"
echo "抽取完成"

写入任务调度平台:

将 sh 文件内容复制入 GLUE IDE 中,执行一次:

后续用增量抽取:

#!/bin/bash
day=$(date -d "yesterday" +%Y-%m-%d)
if [ $1 != "" ]; thenday=$(date -d "$1 -1 day" +%Y-%m-%d);
fi;
echo "抽取的日期为 $day"
echo "生成增量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"querySql": [
"select * from u_sale_m where
stamp between '\'"$day" 00:00:00\'' and '\'"$day" 23:59:59\'' and
id>0"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"busno","type":"string"},
{"name":"posno","type":"string"},
{"name":"extno","type":"string"},{"name":"extsource","type":"string"}
,
{"name":"o2o_trade_from","type":"str
ing"},
{"name":"channel","type":"int"},
{"name":"starttime","type":"string"}
,
{"name":"finaltime","type":"string"}
,
{"name":"payee","type":"string"},
{"name":"discounter","type":"string"}
,
{"name":"crediter","type":"string"},
{"name":"returner","type":"string"},
{"name":"warranter1","type":"string"}
,
{"name":"warranter2","type":"string"}
,
{"name":"stdsum","type":"string"},
{"name":"netsum","type":"string"},
{"name":"loss","type":"string"},
{"name":"discount","type":"float"},
{"name":"member","type":"string"},
{"name":"precash","type":"string"},
{"name":"stamp","type":"string"},
{"name":"shiftid","type":"string"},
{"name":"shiftdate","type":"string"}
,
{"name":"yb_saleno","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_m_inc.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/tmp/erp_u_sale_m_inc",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {"channel": 2
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_m_inc
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_m_inc.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists
ods_shihaihong.erp_u_sale_m_inc(
id int,
saleno string,
busno string,
posno string,
extno string,
extsource string,
o2o_trade_from string,
channel int,
starttime string,
finaltime string,
payee string,
discounter string,
crediter string,
returner string,
warranter1 string,
warranter2 string,
stdsum string,
netsum string,
loss string,
discount float,
member string,
precash string,
shiftid string,
shiftdate string,
yb_saleno string) partitioned by (stamp string)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_m_inc";
'
echo "加载数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_m_inc/*'
overwrite into table ods_shihaihong.erp_u_sale_m_inc
partition(stamp='"$day"');
"
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
show partitions ods_shihaihong.erp_u_sale_m_inc;
select count(1) from ods_shihaihong.erp_u_sale_m_inc where stamp
= '"$day"';
select * from ods_shihaihong.erp_u_sale_m_inc where stamp =
'"$day"' limit 5;
"
echo "抽取完成"

任务调度:

执行一次,输入参数:

erp_u_sale_pay_inc:

全量抽取:

#!/bin/bash
echo "生成全量配置文件"
mkdir -p /zhiyun/shihaihong/jobsecho '{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"u_sale_pay"
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"cardno","type":"string"},
{"name":"netsum","type":"string"},
{"name":"paytype","type":"string"},
{"name":"bak1","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_pay_full.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/ods/erp_u_sale_pay_full",
"writeMode": "truncate"}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}' > /zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/ods/erp_u_sale_pay_full
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_pay_full.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
-- 增量表
create external table if not exists
ods_shihaihong.u_sale_pay_full(
id int,
saleno string,
cardno string,
netsum string,
paytype string,
bak1 string
)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_pay_full";
'
# echo "加载数据"
# beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
# load data inpath
\"/zhiyun/shihaihong/tmp/erp_u_sale_pay_full/*\" overwrite into
table ods_shihaihong.erp_u_sale_pay_full
partition(createtime='$day');# "
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
select count(1) from ods_shihaihong.u_sale_pay_full;
"
echo "抽取完成"

任务调度:

编辑 GLUE IDE,执行一次:

增量抽取:

#!/bin/bash
day=$(date -d "yesterday" +%Y-%m-%d)
if [ $1 != "" ]; then
day=$(date -d "$1 -1 day" +%Y-%m-%d);
fi;
echo "抽取的日期为 $day"
echo "生成增量配置文件"
mkdir -p /zhiyun/shihaihong/jobs
echo '
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://zhiyun.pub:233
06/erp?useSSL=false"
],
"table": [
"select u_sale_pay.*,stamp
from u_sale_pay left join u_sale_m on
u_sale_pay.saleno=u_sale_m.saleno where stamp between '\'"$day"
00:00:00\'' and '\'"$day" 23:59:59\'' and id>0 "
]
}
],
"password": "zhiyun",
"username": "zhiyun"
}},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":
"id","type":"int"},
{"name":"saleno","type":"string"},
{"name":"cardno","type":"string"},
{"name":"netsum","type":"string"},
{"name":"paytype","type":"string"},
{"name":"bak1","type":"string"},
{"name":"stamp","type":"string"}
],
"defaultFS": "hdfs://cdh02:8020",
"fieldDelimiter": "\t",
"fileName": "erp_u_sale_pay_inc.data",
"fileType": "orc",
"path":
"/zhiyun/shihaihong/tmp/erp_u_sale_pay_inc",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
' > /zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
echo "开始抽取"
hadoop fs -mkdir -p /zhiyun/shihaihong/tmp/erp_u_sale_play_inc
python /opt/datax/bin/datax.py
/zhiyun/shihaihong/jobs/erp_u_sale_play_inc.json
echo "hive 建表"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e 'create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists
ods_shihaihong.erp_u_sale_play_inc(
id int,
saleno string,
cardno string,
netsum string,
paytype string,
bak1 string
) partitioned by (stamp string)
row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as orc
location "/zhiyun/shihaihong/ods/erp_u_sale_play_inc";
'
echo "加载数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
load data inpath '/zhiyun/shihaihong/tmp/erp_u_sale_play_inc/*'
overwrite into table ods_shihaihong.erp_u_sale_play_inc
partition(stamp='"$day"');
"
echo "验证数据"
beeline -u jdbc:hive2://localhost:10000 -n root -p 123 -e "
show partitions ods_shihaihong.erp_u_sale_play_inc;
select count(1) from ods_shihaihong.erp_u_sale_play_inc where
stamp = '"$day"';
select * from ods_shihaihong.erp_u_sale_play_inc where stamp =
'"$day"' limit 5;
"
echo "抽取完成"

任务调度:

执行一次:

标签: 大数据 python linux

本文转载自: https://blog.csdn.net/shh2000424/article/details/142375636
版权归原作者 工科小石头 所有, 如有侵权,请联系我们删除。

“学习大数据DAY59 全量抽取和增量抽取实战”的评论:

还没有评论