0


flink车联网项目:维表离线同步(第69天)

系列文章目录

3.3 维表离线同步
3.3.1 思路
3.3.2 示例
3.3.3 其他表开发
3.3.4 部署
3.3.1.1 将表提交到生成环境
3.3.1.2 添加虚拟节点
3.3.1.3 配置计算节点
3.3.1.4 添加虚拟结束节点
3.3.1.5 提交到生产环境
3.3.1.6 发布
3.3.1.7 运维中心
3.3.1.8 补数据
3.3.1.9 补数据实例
3.3.1.10 查询数据

文章目录


前言

本文为flink车联网项目:维表离线同步,后续章节为:ods层具体实现

3.3 维表离线同步

3.3.1 思路

这部分就是把实时的维表每天定时同步到离线MaxCompute表中,从而解决维表变化的问题。在离线计算的时候,就可以直接访问MaxCompute的离线表。
一般定时调度的时间是每天凌晨,这样就相当于将每天凌晨的维表快照信息做为昨天最后的状态信息。写到分区时,用 date_sub(current_date, 1) 即可计算时间。
因为阿里云Flink中虽然支持批处理,但是目前没有调度功能,所以写出到MaxCompute表的工作需要使用DataWorks来完成。以下部分在DataWorks中完成。

3.3.2 示例

(1)创建dim业务流程
打开DataWorks控制台,打开dwxxx项目,点击数据开发,进入DataStudio.

在业务流程右键,新建业务流程,名称为dim
在这里插入图片描述

(2)创建临时查询
点击临时查询,新建ODPS SQL,路径为临时查询,名称为create_table
在这里插入图片描述

同样的方法,创建名称为query的临时查询
(3)创建MaxCompute分区表
在临时查询create_table中运行。
– 按日分区维表
drop table if exists

mc_dim_car_info_i

;
create table if not exists

mc_dim_car_info_i

(

id

bigint,

car_status

bigint comment ‘车辆状态 1:启用 0:禁用’,

is_inspection

bigint comment ‘是否验车 1:是 0:否’,

audit_status

bigint comment ‘普通车辆审核状态 0:数据初始化 1:待初审 3:初审未通过 5:待终审 7:终审通过 8:终审未通过 10:信息已失效’,

net_audit_status

bigint comment ‘网约车辆审核状态 0:数据初始化 1:待初审 3:初审未通过 5:待终审 7:终审通过 8:终审未通过 10:信息已失效’,

audit_desc

string comment ‘车辆审核描述’,

audit_date

timestamp comment ‘车辆普通信息最后审核时间’,

net_audit_date

timestamp comment ‘车辆网约信息最后审核时间’,

car_level_id

bigint comment ‘车辆等级id’,

car_level_name

string comment ‘车辆等级名称’,

car_source

bigint comment ‘车辆来源: 0.数据初始化 1.自营 2.加盟’,

register_channel

bigint comment ‘注册渠道,1:app 2: 第三方渠道 3:管理系统’,

car_city_no

bigint comment ‘车辆所在地城市编号’,

car_city_name

string comment ‘城市名称’,

car_plate_no

string comment ‘车牌号’,

car_plate_color

bigint comment ‘>> 已修改 车牌颜色(1.蓝色 2.黄色 3.黑色 4.白色 5.绿色 6.其他) -->原sql 9.其他’,

car_brand_id

bigint comment ‘车辆品牌id’,

car_brand

string comment ‘车辆品牌’,

car_color

bigint comment ‘车辆颜色(1.黑色 2.白色 3.蓝色 4.金色 5.银色 6.棕色 7.灰色 8.黄色 9.粉色 10.红色 11.紫色 12.绿色)’,

car_seats

bigint comment ‘核定载客人数’,

car_owner_type

bigint comment ‘车辆归属类型(行驶证信息):1.个人私家车 2.出租车企业 3.平台自有车’,

car_producer_brand

string comment ‘车辆厂牌(行驶证信息)’,

car_producer_brand_model

string comment ‘车辆品牌型号(行驶证信息)’,

vehicle_type

string comment ‘车辆类型(行驶证信息)’,

car_owner

string comment ‘车辆所有人’,

car_character

bigint comment ‘车辆性质: 1.非营运 2.租赁 3.预约出租客运 4.出租客运 5.营转非’,

vin

string comment ‘车架号’,

engine_no

string comment ‘发动机号’,

fuel_type

bigint comment ‘车辆燃料类型(1.汽油 2.柴油 3.天然气 4.液化气 5.电动 6.其他)’,

displacement

bigint comment ‘发动机排量(ml)’,

inspection_status

bigint comment ‘车辆年检状态 1:未年检 2:已年检 3:年检未通过’,

total_distance

bigint comment ‘行驶总里程(km)’,

engine_power

bigint comment ‘发动机功率,单位kw’,

wheel_base

bigint comment ‘车辆轴距(m)’,

gps_brand

string comment ‘卫星定位装置品牌’,

gps_model

string comment ‘卫星定位装置型号’,

gps_imei

string comment ‘卫星定位imei’,

net_car_lic_no

string comment ‘网络预约汽车运输资格证号’,

net_car_lic_issue_agency

string comment ‘网络预约汽车运输资格证颁发机构’,

net_car_lic_oper_area

string comment ‘经营区域’,

net_car_lic_verified

bigint comment ‘网约车汽车运输资格证三方验证:1.通过 2.未验证 3.未通过’,

net_car_lic_verified_remark

string comment ‘网约车汽车运输资格证三方验证备注’,

is_push

bigint comment '是否向交委推送:1:未推送;2:推送 3:驳回 ',

is_push_time

timestamp comment '向交委推送时间 ',

create_time

timestamp comment ‘创建时间’,

update_time

timestamp comment ‘更新时间’,

write_time

timestamp comment ‘写入时间’,

id_card_valid_start

timestamp comment ‘身份证有效期开始’,

id_card_valid_end

timestamp comment ‘身份证失效时间’,

driver_lic_issue

timestamp comment ‘初次领取驾驶证日期’,

driver_lic_valid_start

timestamp comment ‘驾驶证生效日期’,

driver_lic_valid_end

timestamp comment ‘驾驶证失效日期’,

car_lic_register

timestamp comment ‘车辆注册日期(行驶证信息)’
)
comment ‘车辆信息维表_按日分区’
partitioned by (dt string comment ‘写入日期’)
;
(4)创建计算节点
在dim业务流程下,数据开发右键,新建节点,选择ODPS SQL
名称则为表名

(5)将paimon数据插入到maxcompute分区表
在mc_dim_car_info_i节点中插入以下代码。
每天凌晨调度,如0:05分调度,抽取到的数据就是当天的初始状态,也是昨天的最终的状态,所以需要将数据放到昨天的分区中。
set odps.sql.common.table.planner.ext.hive.bridge = true;
set odps.sql.hive.compatible = true;

insert overwrite table

mc_dim_car_info_i

partition(dt)
select

id

,

car_status

,

is_inspection

,

audit_status

,

net_audit_status

,

audit_desc

,

audit_date

,

net_audit_date

,

car_level_id

,

car_level_name

,

car_source

,

register_channel

,

car_city_no

,

car_city_name

,

car_plate_no

,

car_plate_color

,

car_brand_id

,

car_brand

,

car_color

,

car_seats

,

car_owner_type

,

car_producer_brand

,

car_producer_brand_model

,

vehicle_type

,

car_owner

,

car_character

,

vin

,

engine_no

,

fuel_type

,

displacement

,

inspection_status

,

total_distance

,

engine_power

,

wheel_base

,

gps_brand

,

gps_model

,

gps_imei

,

net_car_lic_no

,

net_car_lic_issue_agency

,

net_car_lic_oper_area

,

net_car_lic_verified

,

net_car_lic_verified_remark

,

is_push

,

is_push_time

,

create_time

,

update_time

,

write_time

,

id_card_valid_start

,

id_card_valid_end

,

driver_lic_issue

,

driver_lic_valid_start

,

driver_lic_valid_end

,

car_lic_register

, date_sub(current_date, 1) as dt
from

dwxxxx_dev

.

paimon_dim_car_info_f

;
(6)结果
通过每天定时调度,每天会产生一个历史分区,存储当天的维表状态。下图为10月24号凌晨调度,产生的9月23日的分区。
在这里插入图片描述

3.3.3 其他表开发

将表创建好之后,把每一个insert语句写入到一个单独的ODPS表中。
– 按日分区维表 mc_dim_car_vendor_i
drop table if exists

mc_dim_car_vendor_i

;
create table if not exists

mc_dim_car_vendor_i

(

vendor_id

bigint,

area_name

string,

vendor_name

string,

area_id

bigint,

login_id

bigint,

contact_mobile

string,

contact_name

string,

service_status

bigint,

create_time

timestamp,

update_time

timestamp
)
comment ‘车队信息维表_按日分区’
partitioned by (dt string comment ‘写入日期’)
;

– 按日分区维表 dim_driver_info_i
drop table if exists

mc_dim_driver_info_i

;
create table if not exists

mc_dim_driver_info_i

(

driver_id

bigint,

login_id

bigint,

leader_id

bigint,

car_id

bigint,

city_no

bigint,

city_name

string,

phone

string,

true_name

string,

nation

string,

address

string,

education

string,

id_card_no

string,

id_card_issue_org

string,

nationality

string,

registered_address

string,

driver_mark

bigint,

sex

bigint,

driver_lic_no

string,

driver_lic_class

string,

driver_lic_issue_org

string,

net_driver_lic_no

string,

net_driver_lic_type

bigint,

net_driver_lic_issue_org

string,

work_lic_no

string,

contract_id

string,

registration_time

timestamp,

driver_type

bigint,

register_channel

bigint,

audit_status

bigint,

net_audit_status

bigint,

audit_status_sum

bigint,

net_audit_status_sum

bigint,

net_driver_lic_verified

bigint,

net_driver_lic_verified_remark

string,

last_audit_date

timestamp,

net_audit_date

timestamp,

audit_desc

string,

allow_receive_type

bigint,

real_allow_change

bigint,

booking_allow_change

bigint,

is_blacklist

bigint,

is_push

bigint,

is_push_time

timestamp,

bind_car_time

timestamp,

net_audit_status_sum_date

timestamp,

audit_status_sum_date

timestamp,

id_card_valid_start

timestamp,

id_card_valid_end

timestamp,

driver_lic_issue

timestamp,

driver_lic_valid_start

timestamp,

driver_lic_valid_end

timestamp,

md5_phone

string,

md5_id_card_no

string,

driver_status_created_at

timestamp,

driver_status_updated_at

timestamp,

driver_birthday

string,

net_driver_lic_get

timestamp,

work_lic_issue

timestamp,

net_driver_lic_issue

timestamp,

work_lic_valid_start

timestamp,

net_driver_lic_report

timestamp,

net_driver_lic_valid_start

timestamp,

work_lic_valid_end

timestamp,

net_driver_lic_valid_end

timestamp,

login_type

bigint,

create_time

timestamp,

update_time

timestamp
)
comment ‘司机信息维表_按日分区’
partitioned by (dt string comment ‘写入日期’)
;

– 按日分区维表 mc_dim_area_i
drop table if exists

mc_dim_area_i

;
create table if not exists

mc_dim_area_i

(
area_id bigint comment ‘城市id’,
parent_area_id bigint comment ‘父id’,
area_name string comment ‘城市名称’,
area_type bigint comment ‘区域类型,2:国家,3:省,4:市,5:区,6:县’,
pin_yin string comment ‘城市汉语拼音’
)
comment ‘区域信息维表_按日分区’
partitioned by (dt string comment ‘写入日期’)
;

– 创建分区表 mc_dim_date_work_type_i
drop table if exists

mc_dim_date_work_type_i

;
create table if not exists

mc_dim_date_work_type_i

(
id bigint,
work_day date comment ‘日期’,
day_of_week bigint comment ‘周一到周日’,
work_type bigint comment ‘1:工作日,2:节假日’,
vacation string comment ‘节假’,
created_at timestamp comment ‘创建时间’,
updated_at timestamp comment ‘更新时间’
)
comment ‘节假日日期类型表’
partitioned by (dt string comment ‘写入日期’)
;

– 创建分区表 mc_dim_date_work_type_i
drop table if exists

mc_dim_user_info_i

;
create table if not exists

mc_dim_user_info_i

(
userid bigint comment ‘用户ID’,
mobile string comment ‘手机号’,
password string comment ‘密码’,
realname string comment ‘真实姓名’,
gender bigint comment '0未知 1男 2女 ',
email string comment ‘邮箱’,
areaid bigint comment ‘区域id’,
address string comment ‘用户地址’,
birthday timestamp comment ‘生日’,
areacode string comment ‘国家地区区号 中国 0086’,
photourl string comment ‘头像’,
enabled bigint comment ‘0不启用 1启用’,
createtime timestamp,
update_time timestamp
)
comment ‘乘客信息表’
partitioned by (dt string comment ‘写入日期’)
;

– 插入数据

mc_dim_car_vendor_i

insert overwrite table

mc_dim_car_vendor_i

partition(dt)
select

vendor_id

,

area_name

,

vendor_name

,

area_id

,

login_id

,

contact_mobile

,

contact_name

,

service_status

,

create_time

,

update_time

, date_sub(current_date, 1) as dt
from

dwxxx_dev

.

paimon_dim_car_vendor_f

;

– 插入数据

mc_dim_driver_info_i

insert overwrite table

mc_dim_driver_info_i

partition(dt)
select

driver_id

,

login_id

,

leader_id

,

car_id

,

city_no

,

city_name

,

phone

,

true_name

,

nation

,

address

,

education

,

id_card_no

,

id_card_issue_org

,

nationality

,

registered_address

,

driver_mark

,

sex

,

driver_lic_no

,

driver_lic_class

,

driver_lic_issue_org

,

net_driver_lic_no

,

net_driver_lic_type

,

net_driver_lic_issue_org

,

work_lic_no

,

contract_id

,

registration_time

,

driver_type

,

register_channel

,

audit_status

,

net_audit_status

,

audit_status_sum

,

net_audit_status_sum

,

net_driver_lic_verified

,

net_driver_lic_verified_remark

,

last_audit_date

,

net_audit_date

,

audit_desc

,

allow_receive_type

,

real_allow_change

,

booking_allow_change

,

is_blacklist

,

is_push

,

is_push_time

,

bind_car_time

,

net_audit_status_sum_date

,

audit_status_sum_date

,

id_card_valid_start

,

id_card_valid_end

,

driver_lic_issue

,

driver_lic_valid_start

,

driver_lic_valid_end

,

md5_phone

,

md5_id_card_no

,

driver_status_created_at

,

driver_status_updated_at

,

driver_birthday

,

net_driver_lic_get

,

work_lic_issue

,

net_driver_lic_issue

,

work_lic_valid_start

,

net_driver_lic_report

,

net_driver_lic_valid_start

,

work_lic_valid_end

,

net_driver_lic_valid_end

,

login_type

,

create_time

,

update_time

, date_sub(current_date, 1) as dt
from

dwxxxx_dev

.

paimon_dim_driver_info_f

;

– 插入数据

mc_dim_area_i

insert overwrite table

mc_dim_area_i

partition(dt)
select area_id
, parent_area_id
, area_name
, area_type
, pin_yin
, date_sub(current_date, 1) as dt
from

dwxxxx_dev

.

paimon_dim_area_f

;

– 插入数据 mc_dim_date_work_type_i
insert overwrite table

mc_dim_date_work_type_i

partition(dt)
select id
, work_day
, day_of_week
, work_type
, vacation
, created_at
, updated_at
, date_sub(current_date, 1) as dt
from

dwxxxxcx_dev

.

paimon_dim_date_work_type_f

;

– 插入数据 mc_dim_user_info_i
insert overwrite table mc_dim_user_info_i partition(dt)
select userid
, mobile
, password
, realname
, gender
, email
, areaid
, address
, birthday
, areacode
, photourl
, enabled
, createtime
, update_time
, date_sub(current_date, 1) as dt
from

dwxxxx_dev

.

paimon_dim_user_info_f

;
操作完成后,所有的任务如下图所示。
在这里插入图片描述

3.3.4 部署

以上的开发都是在开发环境开发的,需要提交到生产环境,才能定时进行调度。

3.3.1.1 将表提交到生成环境

在表管理中,依次点击各个新建的MaxCompute表,将中文名补全,然后点击提交到生产环境,然后在弹出的对话框中点击确定
在这里插入图片描述

3.3.1.2 添加虚拟节点

在业务流程dim处右键,点击看板
在这里插入图片描述

添加一个虚拟节点,主要是用来控制整个业务流程的启停
在看板中拖拽一个虚拟节点,名称为dim_start
在这里插入图片描述

双击dim_start,修改时间属性——
实例生成方式:T+1次日生成
调度类型:正常调度
调度日历:默认
调度日期:日
调度时间:00:05
重跑属性:运行成功或失败后皆可重跑
这里点击修改默认设置,修改成如下配置:
在这里插入图片描述

勾选失败自动重跑

修改调度依赖——
提交前自动解析设置为否
因为这个是起始节点,所以不需要依赖其他节点,在依赖的上游节点处点击使用工作空间根节点

修改完成后,点击保存

3.3.1.3 配置计算节点

按照相同的配置,修改其他6个节点。不同的地方是依赖不再依赖根节点,而是依赖dim_start即可。
添加方式是,在节点输出搜索dim_start,然后点击dwxxxx.dim_start,然后点击添加

全部配置完成后,点击刷新,可以看到如下的结果
在这里插入图片描述

3.3.1.4 添加虚拟结束节点

添加一个虚拟节点,主要是用来指示这个流程结束。
在看板中拖拽一个虚拟节点,名称为dim_end
拖拽六个计算节点到dim_end,形成依赖关系。
在这里插入图片描述

也可以双击dim_end进行修改调度依赖,在节点输出中,依次输入并添加六个dim计算节点,把他们作为依赖。
配置完成后,可以点击右上角的格式化来美化依赖图。

结果如下图所示:

在这里插入图片描述

3.3.1.5 提交到生产环境

点击左上角提交
在这里插入图片描述

勾选所有节点
变更描述为:首次提交
勾选忽略输入输出不一致的告警
点击确定
在这里插入图片描述

等待提交完成

3.3.1.6 发布

提交完成后,点击看板右上角的发布
这里显示的刚刚提交上来的任务。可以对任务进行查看和测试,如果有问题就可以取消发布,没有问题就可以发布到生产环境。
这里不再进行测试,直接勾选全部任务,点击发布选中项
等待发布完成

3.3.1.7 运维中心

发布完成后,就会自动进行调度了,可以前往运维中心查看调度情况,也可以进行手动调度
点击运维中心,进入运维界面
点击周期任务运维,周期任务,可以看到提交到集群上任务

3.3.1.8 补数据

右键dim_start,点击补数据,当前节点及下游节点,可以将任务手动运行一遍
业务日期填写10月23日
并行为否
补数据告警为否
勾选全部任务
点击确定

3.3.1.9 补数据实例

在补数据实例中可以查看补数据的运行情况

如果实例运行失败了,可以查看日志,然后根据错误对脚本进行修改。然后重新提交,重新发布,重新运行。

3.3.1.10 查询数据

可以在临时查询中查询一张维表,来检验一下生产上的数据。
注意:生产上的数据需要在表名前加上 dwxxx.
– 开启全表扫描,仅此Session有效
set odps.sql.allow.fullscan=true;
select * from dwxxx.mc_dim_car_info_i limit 100;
在这里插入图片描述

标签: flink 大数据 sql

本文转载自: https://blog.csdn.net/syhiiu/article/details/141301322
版权归原作者 大数据飞总 所有, 如有侵权,请联系我们删除。

“flink车联网项目:维表离线同步(第69天)”的评论:

还没有评论