0


flink车联网项目:业务实现2(维表开发)(第68天)

系列文章目录

3.2 维表开发
3.2.1 创建库
3.2.2 示例
3.2.2.1 类型转换
3.2.2.2 创建mysql映射表
3.2.2.3 创建paimon映射表
3.2.2.4 从mysql插入到paimon表
3.2.2.5 结果查看
3.2.2.6 测试
3.2.3 其他表开发
3.2.4 部署

文章目录


前言

本文为flink车联网项目:业务实现2(维表开发),后续章节为:维表离线同步

3.2 维表开发

3.2.1 创建库

create database if not exists vvp.mysql;
create database if not exists xxxpm.dim;

3.2.2 示例

3.2.2.1 类型转换

这里我们采用flink cdc对数据进行采集,Mysql 与Flink SQL字段类型转换如下
在这里插入图片描述

3.2.2.2 创建mysql映射表

传统方法如下。但是因为配置了mysqlcdc catalog,所以可以直接来使用 mysqlcdc.dim.dim_car_info,不再单独创建映射表,从而提高开发效率。
drop table if exists

mysql

.

mysql_dim_car_info

;
create table if not exists

mysql

.

mysql_dim_car_info

(

id

BIGINT,

car_status

INT,

is_inspection

INT,

audit_status

BIGINT,

net_audit_status

INT,

audit_desc

STRING,

audit_date

TIMESTAMP,

net_audit_date

TIMESTAMP,

car_level_id

BIGINT,

car_level_name

STRING,

car_source

BIGINT,

register_channel

BIGINT,

car_city_no

BIGINT,

car_city_name

STRING,

car_plate_no

STRING,

car_plate_color

BIGINT,

car_brand_id

BIGINT,

car_brand

STRING,

car_color

BIGINT,

car_seats

BIGINT,

car_owner_type

BIGINT,

car_producer_brand

STRING,

car_producer_brand_model

STRING,

vehicle_type

STRING,

car_owner

STRING,

car_character

BIGINT,

vin

STRING,

engine_no

STRING,

fuel_type

BIGINT,

displacement

BIGINT,

inspection_status

BIGINT,

total_distance

BIGINT,

engine_power

BIGINT,

wheel_base

BIGINT,

gps_brand

STRING,

gps_model

STRING,

gps_imei

STRING,

net_car_lic_no

STRING,

net_car_lic_issue_agency

STRING,

net_car_lic_oper_area

STRING,

net_car_lic_verified

BIGINT,

net_car_lic_verified_remark

STRING,

is_push

BIGINT,

is_push_time

TIMESTAMP,

create_time

TIMESTAMP,

update_time

TIMESTAMP,

write_time

TIMESTAMP,

id_card_valid_start

TIMESTAMP,

id_card_valid_end

TIMESTAMP,

driver_lic_issue

TIMESTAMP,

driver_lic_valid_start

TIMESTAMP,

driver_lic_valid_end

TIMESTAMP,

car_lic_register

TIMESTAMP,
primary key (

id

) not enforced
) with (
‘connector’ = ‘mysql’
,‘hostname’ = ‘rm-cn-x0r3fp1lj000qa.rwlb.rds.aliyuncs.com’
,‘port’ = ‘3306’
,‘username’ = ‘xxxx’
,‘password’ = ‘xxxxx’
,‘database-name’ = ‘dim’
,‘table-name’ = ‘dim_car_info’
)
;

3.2.2.3 创建paimon映射表

drop table if exists

xxx

.

dim

.

paimon_dim_car_info_f

;
create table if not exists

xxx.

dim

.

paimon_dim_car_info_f

( 

id

 BIGINT, 

car_status

 BIGINT comment '车辆状态 1:启用 0:禁用', 

is_inspection

 BIGINT comment '是否验车 1:是 0:否', 

audit_status

 BIGINT comment '普通车辆审核状态 0:数据初始化 1:待初审 3:初审未通过 5:待终审 7:终审通过 8:终审未通过 10:信息已失效', 

net_audit_status

 BIGINT comment '网约车辆审核状态 0:数据初始化 1:待初审 3:初审未通过 5:待终审 7:终审通过 8:终审未通过 10:信息已失效', 

audit_desc

 STRING comment '车辆审核描述', 

audit_date

 TIMESTAMP comment '车辆普通信息最后审核时间', 

net_audit_date

 TIMESTAMP comment '车辆网约信息最后审核时间', 

car_level_id

 BIGINT comment '车辆等级ID', 

car_level_name

 STRING comment '车辆等级名称', 

car_source

 BIGINT comment '车辆来源: 0.数据初始化 1.自营 2.加盟', 

register_channel

 BIGINT comment '注册渠道,1:app 2: 第三方渠道 3:管理系统', 

car_city_no

 BIGINT comment '车辆所在地城市编号', 

car_city_name

 STRING comment '城市名称', 

car_plate_no

 STRING comment '车牌号', 

car_plate_color

 BIGINT comment '车牌颜色(1.蓝色 2.黄色 3.黑色 4.白色 5.绿色 6.其他', 

car_brand_id

 BIGINT comment '车辆品牌ID', 

car_brand

 STRING comment '车辆品牌', 

car_color

 BIGINT comment '车辆颜色(1.黑色 2.白色 3.蓝色 4.金色 5.银色 6.棕色 7.灰色 8.黄色 9.粉色 10.红色 11.紫色 12.绿色)', 

car_seats

 BIGINT comment '核定载客人数', 

car_owner_type

 BIGINT comment '车辆归属类型(行驶证信息):1.个人私家车 2.出租车企业 3.平台自有车', 

car_producer_brand

 STRING comment '车辆厂牌(行驶证信息)', 

car_producer_brand_model

 STRING comment '车辆品牌型号(行驶证信息)', 

vehicle_type

 STRING comment '车辆类型(行驶证信息)', 

car_owner

 STRING comment '车辆所有人', 

car_character

 BIGINT comment '车辆性质: 1.非营运 2.租赁 3.预约出租客运 4.出租客运 5.营转非', 

vin

 STRING comment '车架号', 

engine_no

 STRING comment '发动机号', 

fuel_type

 BIGINT comment '车辆燃料类型(1.汽油 2.柴油 3.天然气 4.液化气 5.电动 6.其他)', 

displacement

 BIGINT comment '发动机排量(mL)', 

inspection_status

 BIGINT comment '车辆年检状态 1:未年检 2:已年检 3:年检未通过', 

total_distance

 BIGINT comment '行驶总里程(KM)', 

engine_power

 BIGINT comment '发动机功率,单位kw', 

wheel_base

 BIGINT comment '车辆轴距(M)', 

gps_brand

 STRING comment '卫星定位装置品牌', 

gps_model

 STRING comment '卫星定位装置型号', 

gps_imei

 STRING comment '卫星定位IMEI', 

net_car_lic_no

 STRING comment '网络预约汽车运输资格证号', 

net_car_lic_issue_agency

 STRING comment '网络预约汽车运输资格证颁发机构', 

net_car_lic_oper_area

 STRING comment '经营区域', 

net_car_lic_verified

 BIGINT comment '网约车汽车运输资格证三方验证:1.通过 2.未验证 3.未通过', 

net_car_lic_verified_remark

STRING comment '网约车汽车运输资格证三方验证备注', 

is_push

 BIGINT comment '是否向交委推送:1:未推送;2:推送 3:驳回 ', 

is_push_time

 TIMESTAMP comment '向交委推送时间 ', 

create_time

 TIMESTAMP comment '创建时间', 

update_time

 TIMESTAMP comment '更新时间', 

write_time

 TIMESTAMP comment '写入时间', 

id_card_valid_start

 TIMESTAMP comment '身份证有效期开始', 

id_card_valid_end

 TIMESTAMP comment '身份证失效时间', 

driver_lic_issue

 TIMESTAMP comment '初次领取驾驶证日期', 

driver_lic_valid_start

 TIMESTAMP comment '驾驶证生效日期', 

driver_lic_valid_end

 TIMESTAMP comment '驾驶证失效日期', 

car_lic_register

 TIMESTAMP comment '车辆注册日期(行驶证信息)', primary key (

id`) not enforced
) with (
‘bucket’ = ‘4’,
‘bucket-key’ = ‘id’,
‘changelog-producer’ = ‘input’,
‘compaction.max.file-num’ = ‘20’,
‘compaction.min.file-num’ = ‘4’,
‘continuous.discovery-interval’ = ‘5s’,
‘log.changelog-mode’ = ‘auto’,
‘merge-engine’ = ‘deduplicate’,
‘write-mode’ = ‘auto’,
‘scan.mode’ = ‘default’,
‘lookup.async’ = ‘true’,
‘lookup.async-thread-number’ = ‘1’
)
;
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2.2.4 从mysql插入到paimon表

在流作业草稿中部署运行
insert into

xxxpm

.

dim

.

paimon_dim_car_info_f

select

id

,

car_status

,

is_inspection

,

audit_status

,

net_audit_status

,

audit_desc

,

audit_date

,

net_audit_date

,

car_level_id

,

car_level_name

,

car_source

,

register_channel

,

car_city_no

,

car_city_name

,

car_plate_no

,

car_plate_color

,

car_brand_id

,

car_brand

,

car_color

,

car_seats

,

car_owner_type

,

car_producer_brand

,

car_producer_brand_model

,

vehicle_type

,

car_owner

,

car_character

,

vin

,

engine_no

,

fuel_type

,

displacement

,

inspection_status

,

total_distance

,

engine_power

,

wheel_base

,

gps_brand

,

gps_model

,

gps_imei

,

net_car_lic_no

,

net_car_lic_issue_agency

,

net_car_lic_oper_area

,

net_car_lic_verified

,

net_car_lic_verified_remark

,

is_push

,

is_push_time

,

create_time

,

update_time

,

write_time

,

id_card_valid_start

,

id_card_valid_end

,

driver_lic_issue

,

driver_lic_valid_start

,

driver_lic_valid_end

,

car_lic_register

from mysqlcdc.dim.dim_car_info;

3.2.2.5 结果查看

此时任务已经提交成功,首先可以查看flink web页面,任务是否正常运行。
在这里插入图片描述
在这里插入图片描述

其次,数据是 sink 到 paimon 中,不仅 paimon表中能查到数据,而且 oss 上对应的路径也能查看到文件的更新。
select * from

xxxpm

.

dim

.

paimon_dim_car_info_f

;
在这里插入图片描述

此外,因为使用了paimon整合了MaxCompute,所以在MaxCompute中也会生成对应的表,当然也能查到对应的数据。
打开datawork控制台,打开DataStudio。
在表管理中,可以查看到新建的paimon_dim_car_info_f表。
在这里插入图片描述

双击表名,可以看到表的元数据信息。
在开发中可以查询表中的数据,需要设置两个参数:
odps.sql.common.table.planner.ext.hive.bridge 为双签名开关,值需要设置为true,表示打开双签名开关,通过双签名模式创建Paimon外部表。
odps.sql.hive.compatible 设置为true,则为hive兼容模式。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
select * from dwxxx_dev.paimon_dim_car_info_f ;

3.2.2.6 测试

1)查看mysql数据变化之前paimon表的数据
select car_status from dwxxx_dev.paimon_dim_car_info_f
where id = 1462;

2)mysql表更新数据
登录dms,选择dim库,运行下面的代码
update dim.dim_car_info set car_status=0 where id = 1462;
3)稍等片刻后再次查看paimon外部表的数据
select car_status from dwxxx_dev.paimon_dim_car_info_f
where id = 1462;

可见paimon表中数据会自动更新,实时变化。

3.2.3 其他表开发

– 维表 dim_car_vendor
drop table if exists

xxxxpm

.

dim

.

paimon_dim_car_vendor_f

;
create table if not exists

xxxxpm

.

dim

.

paimon_dim_car_vendor_f

(

vendor_id

BIGINT,

area_name

STRING,

vendor_name

STRING,

area_id

BIGINT,

login_id

BIGINT,

contact_mobile

STRING,

contact_name

STRING,

service_status

BIGINT,

create_time

TIMESTAMP,

update_time

TIMESTAMP,
primary key (

vendor_id

) not enforced
) with (
‘bucket’ = ‘4’,
‘bucket-key’ = ‘vendor_id’,
‘changelog-producer’ = ‘input’,
‘compaction.max.file-num’ = ‘20’,
‘compaction.min.file-num’ = ‘4’,
‘continuous.discovery-interval’ = ‘5s’,
‘log.changelog-mode’ = ‘auto’,
‘merge-engine’ = ‘deduplicate’,
‘write-mode’ = ‘auto’,
‘scan.mode’ = ‘default’,
‘lookup.async’ = ‘true’,
‘lookup.async-thread-number’ = ‘1’
)
;

insert into

xxxxpm

.

dim

.

paimon_dim_car_vendor_f

select

vendor_id

,

area_name

,

vendor_name

,

area_id

,

login_id

,

contact_mobile

,

contact_name

,

service_status

,

create_time

,

update_time

from

mysqlcdc

.

dim

.

dim_car_vendor

;

– 维表 dim_driver_info
drop table if exists

xxxxpm

.

dim

.

paimon_dim_driver_info_f

;
create table if not exists

xxxxpm

.

dim

.

paimon_dim_driver_info_f

(

driver_id

BIGINT,

login_id

BIGINT,

leader_id

BIGINT,

car_id

BIGINT,

city_no

BIGINT,

city_name

STRING,

phone

STRING,

true_name

STRING,

nation

STRING,

address

STRING,

education

STRING,

id_card_no

STRING,

id_card_issue_org

STRING,

nationality

STRING,

registered_address

STRING,

driver_mark

BIGINT,

sex

BIGINT,

driver_lic_no

STRING,

driver_lic_class

STRING,

driver_lic_issue_org

STRING,

net_driver_lic_no

STRING,

net_driver_lic_type

BIGINT,

net_driver_lic_issue_org

STRING,

work_lic_no

STRING,

contract_id

STRING,

registration_time

TIMESTAMP,

driver_type

BIGINT,

register_channel

BIGINT,

audit_status

BIGINT,

net_audit_status

BIGINT,

audit_status_sum

BIGINT,

net_audit_status_sum

BIGINT,

net_driver_lic_verified

BIGINT,

net_driver_lic_verified_remark

STRING,

last_audit_date

TIMESTAMP,

net_audit_date

TIMESTAMP,

audit_desc

STRING,

allow_receive_type

BIGINT,

real_allow_change

BIGINT,

booking_allow_change

BIGINT,

is_blacklist

BIGINT,

is_push

BIGINT,

is_push_time

TIMESTAMP,

bind_car_time

TIMESTAMP,

net_audit_status_sum_date

TIMESTAMP,

audit_status_sum_date

TIMESTAMP,

id_card_valid_start

TIMESTAMP,

id_card_valid_end

TIMESTAMP,

driver_lic_issue

TIMESTAMP,

driver_lic_valid_start

TIMESTAMP,

driver_lic_valid_end

TIMESTAMP,

md5_phone

STRING,

md5_id_card_no

STRING,

driver_status_created_at

TIMESTAMP,

driver_status_updated_at

TIMESTAMP,

driver_birthday

STRING,

net_driver_lic_get

TIMESTAMP,

work_lic_issue

TIMESTAMP,

net_driver_lic_issue

TIMESTAMP,

work_lic_valid_start

TIMESTAMP,

net_driver_lic_report

TIMESTAMP,

net_driver_lic_valid_start

TIMESTAMP,

work_lic_valid_end

TIMESTAMP,

net_driver_lic_valid_end

TIMESTAMP,

login_type

BIGINT,

create_time

TIMESTAMP,

update_time

TIMESTAMP,
primary key (

driver_id

) not enforced
) with (
‘bucket’ = ‘4’,
‘bucket-key’ = ‘driver_id’,
‘changelog-producer’ = ‘input’,
‘compaction.max.file-num’ = ‘20’,
‘compaction.min.file-num’ = ‘4’,
‘continuous.discovery-interval’ = ‘5s’,
‘log.changelog-mode’ = ‘auto’,
‘merge-engine’ = ‘deduplicate’,
‘write-mode’ = ‘auto’,
‘scan.mode’ = ‘default’,
‘lookup.async’ = ‘true’,
‘lookup.async-thread-number’ = ‘1’
)
;

insert into

xxxxpm

.

dim

.

paimon_dim_driver_info_f

select

driver_id

,

login_id

,

leader_id

,

car_id

,

city_no

,

city_name

,

phone

,

true_name

,

nation

,

address

,

education

,

id_card_no

,

id_card_issue_org

,

nationality

,

registered_address

,

driver_mark

,

sex

,

driver_lic_no

,

driver_lic_class

,

driver_lic_issue_org

,

net_driver_lic_no

,

net_driver_lic_type

,

net_driver_lic_issue_org

,

work_lic_no

,

contract_id

,

registration_time

,

driver_type

,

register_channel

,

audit_status

,

net_audit_status

,

audit_status_sum

,

net_audit_status_sum

,

net_driver_lic_verified

,

net_driver_lic_verified_remark

,

last_audit_date

,

net_audit_date

,

audit_desc

,

allow_receive_type

,

real_allow_change

,

booking_allow_change

,

is_blacklist

,

is_push

,

is_push_time

,

bind_car_time

,

net_audit_status_sum_date

,

audit_status_sum_date

,

id_card_valid_start

,

id_card_valid_end

,

driver_lic_issue

,

driver_lic_valid_start

,

driver_lic_valid_end

,

md5_phone

,

md5_id_card_no

,

driver_status_created_at

,

driver_status_updated_at

,

driver_birthday

,

net_driver_lic_get

,

work_lic_issue

,

net_driver_lic_issue

,

work_lic_valid_start

,

net_driver_lic_report

,

net_driver_lic_valid_start

,

work_lic_valid_end

,

net_driver_lic_valid_end

,

login_type

,

create_time

,

update_time

from

mysqlcdc

.

dim

.

dim_driver_info

;

– 维表 dim_area
drop table if exists

xxxxpm

.

dim

.

paimon_dim_area_f

;
create table if not exists

xxxxpm

.

dim

.

paimon_dim_area_f

(
area_id BIGINT comment ‘城市Id’,
parent_area_id BIGINT comment ‘父ID’,
area_name STRING comment ‘城市名称’,
area_type BIGINT comment ‘区域类型,2:国家,3:省,4:市,5:区,6:县’,
pin_yin STRING comment ‘城市汉语拼音’,
primary key (

area_id

) not enforced
) with (
‘bucket’ = ‘4’,
‘bucket-key’ = ‘area_id’,
‘changelog-producer’ = ‘input’,
‘compaction.max.file-num’ = ‘20’,
‘compaction.min.file-num’ = ‘4’,
‘continuous.discovery-interval’ = ‘5s’,
‘log.changelog-mode’ = ‘auto’,
‘merge-engine’ = ‘deduplicate’,
‘write-mode’ = ‘auto’,
‘scan.mode’ = ‘default’,
‘lookup.async’ = ‘true’,
‘lookup.async-thread-number’ = ‘1’
)
;

insert into

xxxxpm

.

dim

.

paimon_dim_area_f

select area_id
, parent_area_id
, area_name
, area_type
, pin_yin
from

mysqlcdc

.

dim

.

dim_area

;

– 维表 dim_date_work_type
drop table if exists

xxxxpm

.

dim

.

paimon_dim_date_work_type_f

;
create table

xxxxpm

.

dim

.

paimon_dim_date_work_type_f

(
id BIGINT,
work_day DATE comment ‘日期’,
day_of_week BIGINT comment ‘周一到周日’,
work_type BIGINT comment ‘1:工作日,2:节假日’,
vacation STRING comment ‘节假’,
created_at TIMESTAMP comment ‘创建时间’,
updated_at TIMESTAMP comment ‘更新时间’,
primary key (

id

) not enforced
) with (
‘bucket’ = ‘4’,
‘bucket-key’ = ‘id’,
‘changelog-producer’ = ‘input’,
‘compaction.max.file-num’ = ‘20’,
‘compaction.min.file-num’ = ‘4’,
‘continuous.discovery-interval’ = ‘5s’,
‘log.changelog-mode’ = ‘auto’,
‘merge-engine’ = ‘deduplicate’,
‘write-mode’ = ‘auto’,
‘scan.mode’ = ‘default’,
‘lookup.async’ = ‘true’,
‘lookup.async-thread-number’ = ‘1’
)
;

insert into

xxxxpm

.

dim

.

paimon_dim_date_work_type_f

select id
, work_day
, day_of_week
, work_type
, vacation
, created_at
, updated_at
from

mysqlcdc

.

dim

.

dim_date_work_type

;

– 维表 dim_date_work_type
drop table if exists

xxxxpm

.

dim

.

paimon_dim_user_info_f

;
create table if not exists

xxxxpm

.

dim

.

paimon_dim_user_info_f

(
userid BIGINT comment ‘用户ID’,
mobile STRING comment ‘手机号’,
password STRING comment ‘密码’,
realname STRING comment ‘真实姓名’,
gender BIGINT comment '0未知 1男 2女 ',
email STRING comment ‘邮箱’,
areaid BIGINT comment ‘区域id’,
address STRING comment ‘用户地址’,
birthday TIMESTAMP comment ‘生日’,
areacode STRING comment ‘国家地区区号 中国 0086’,
photourl STRING comment ‘头像’,
enabled BIGINT comment ‘0不启用 1启用’,
createtime TIMESTAMP,
update_time TIMESTAMP,
primary key (

userid

) not enforced
) with (
‘bucket’ = ‘4’,
‘bucket-key’ = ‘userid’,
‘changelog-producer’ = ‘input’,
‘compaction.max.file-num’ = ‘20’,
‘compaction.min.file-num’ = ‘4’,
‘continuous.discovery-interval’ = ‘5s’,
‘log.changelog-mode’ = ‘auto’,
‘merge-engine’ = ‘deduplicate’,
‘write-mode’ = ‘auto’,
‘scan.mode’ = ‘default’,
‘lookup.async’ = ‘true’,
‘lookup.async-thread-number’ = ‘1’
)
;

insert into

xxxxpm

.

dim

.

paimon_dim_user_info_f

select userid
, mobile
, password
, realname
, gender
, email
, areaid
, address
, birthday
, areacode
, photourl
, enabled
, createtime
, updatetime
from

mysqlcdc

.

dim

.

dim_user_info

;

3.2.4 部署

(1)将所有的建表语句运行一下
(2)新建一个xxxx的文件夹,在xxxx下建了一个流作业草稿,名称为dim,将所有的insert语句写入到dim作业中。
当有多个insert into语句时,需要包裹在 BEGIN STATEMENT SET; 和 END;之中。
BEGIN STATEMENT SET;
INSERT INTO xxx
INSERT INTO xxx
END;
(3)点击部署
(4)提交到平台上后,点击作业,点击编辑,可以修改资源和运行参数等。

将Job Manager CPU设置为1,Memory设置为2
Task Manager CPU设置为1,Memory设置为4
系统检查点间隔设置为60s
两次系统检查点之间的最短时间间隔设置为60s
修改完成后,点击无状态启动。


本文转载自: https://blog.csdn.net/syhiiu/article/details/141300624
版权归原作者 大数据飞总 所有, 如有侵权,请联系我们删除。

“flink车联网项目:业务实现2(维表开发)(第68天)”的评论:

还没有评论