案例实践——淘宝母婴数据加速查询
随着“全面二孩”政策落地、居民可支配收入稳步增加等因素的刺激,中国的母婴消费市场正迎来黄金时代。与此同时,随着国民消费升级90后宝爸、宝妈人数剧增,消费需求与消费理念都发生了巨大的变化。据罗兰贝格最新公布的报告预计,已经经过了16个年头发展的母婴行业,到2020年,整体规模将达到3.6万亿元,2016-2020年复合增速高达17%,行业前景看起来一片光明。如此大好形势下,母婴人群在母婴消费上有什么特点?消费最高的项目是什么?
本场景将以阿里云实时计算Flink版为基础,使用Flink自带的 MySQL Connector连接RDS云数据库实例,并以一个淘宝母婴订单实时查询的例子尝试上手Connector的数据捕获、数据变更等功能。
本场景中订单和婴儿信息存储在MySQL中,对于订单表,为了方便进行分析,我们让它关联上其对应的婴儿信息,构成一张宽表。另一方面数据经过分组聚合后,计算出订单数量和婴儿出生的关系。
按步骤完成本次实验后,您将掌握的知识有:
- 使用Flink实时计算平台创建并提交作业的方法;
- 编写基于Flink Table API SQL语句的能力;
- 使用MySQL Connector对数据库进行读取的方法;
一、创建数据库表并导入数据
在这个例子中,我们将创建三张数据表,其中一张orders_dataset_tmp是导入数据的临时表,其他两张作为源表,体验淘宝母婴订单实时查询。
进入mysql管理平台DMS,单击数据库实例,在已登录实例中找到test数据库,并双击数据库。
在SQLConsole页签中,输入如下SQL建表语句,然后单击执行。
create table orders_dataset_tmp(
user_id bigint comment '用户身份信息',
auction_id bigint comment '购买行为编号',
cat_id bigint comment '商品种类序列号',
cat1 bigint comment '商品序列号(根类别)',
property TEXT comment '商品属性',
buy_mount int comment '购买数量',
day TEXT comment '购买时间'
);
create table orders_dataset(
order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '订单id',
user_id bigint comment '用户身份信息',
auction_id bigint comment '购买行为编号',
cat_id bigint comment '商品种类序列号',
cat1 bigint comment '商品序列号(根类别)',
property TEXT comment '商品属性',
buy_mount int comment '购买数量',
day TEXT comment '购买时间'
);
--
create table baby_dataset(
user_id bigint NOT NULL PRIMARY KEY,
birthday text comment '婴儿生日',
gender int comment '0 denotes female, 1 denotes male, 2 denotes unknown'
);
在DMS数据管理平台,选择左侧的常用功能>数据导入。
配置如下信息后单击提交申请,将 (sample)sam_tianchi_mum_baby_trade_history.csv 导入 orders_dataset_tmp 表。
点击提交申请后,等待审批完成,点击执行变更,返回如下结果,数据导入完成。
重复上述步骤,将(sample)sam_tianchi_mum_baby.csv 导入 baby_dataset 表。
导入完成之后,在SQLConsole页签中,输入如下SQL,然后单击执行,将订单数据导入到订单源表orders_dataset 中。
insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day)
select * from orders_dataset_tmp;
可以看到几张表中都有了数据。
SELECT * FROM `baby_dataset` ;
SELECT * FROM `orders_dataset` ;
查询表数据条数
SELECT count(1) FROM `baby_dataset` ;
二、创建session集群
使用之前的flink-sql-test-session集群即可。如若没有,安装下面步骤创建。
- 登录实时计算控制台。
- 在Flink全托管页签,单击目标工作空间名称对应应操作列下的控制台。
- 在左侧导航栏,单击Session集群。
- 单击创建Session集群。
表格中未提及的参数保持默认值即可,需要配置的参数说明请参见下表。
配置项
说明
配置示例
名称
Session集群名称。
flink-sql-test-session
状态
设置当前集群的期望运行状态:
- STOPPED:当集群配置完成后保持停止状态,同样会停止所有在运行中的作业。
- RUNNING:当集群配置完成后保持运行状态。
RUNNING
引擎版本
Session集群引擎版本号。
vvr-6.0.7-flink-1.15
Task Managers数量
默认与并行度保持一致。
4
- 单击创建Session集群。
当Session集群状态(页面上方集群名称旁边)从启动中变为运行中后,可以进入后续步骤。
三、源表查询
- 进入Flink开发平台,点击作业开发,在demo文件夹下创建monther-baby-test流作业草稿,版本选择vvr-6.0.7-flink-1.15。创建源表,代码如下
CREATE TABLE orders_dataset (
order_id BIGINT,
`user_id` bigint,
auction_id bigint,
cat_id bigint,
cat1 bigint,
property varchar,
buy_mount int,
`day` varchar ,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-cn-g4t3gzb9789789ca.rwlb.rds.aliyuncs.com',
'port' = '3306',
'username' = 'itlanson',
'password' = 'It123',
'database-name' = 'test',
'table-name' = 'orders_dataset'
);
CREATE TABLE baby_dataset (
`user_id` bigint,
birthday varchar,
gender int,
PRIMARY KEY(user_id) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'rm-cn-g4t3gzb9789789ca.rwlb.rds.aliyuncs.com',
'port' = '3306',
'username' = 'itlanson',
'password' = 'It123',
'database-name' = 'test',
'table-name' = 'baby_dataset'
);
选中代码,点击左上角运行,完成表的创建。创建完之后,可以在元数据中的vvp.default下看到表。
- 查询表数据
select * from baby_dataset;
选中代码,点击调试,提交到flink-sql-test-session集群。结果如下
select * from orders_dataset;
选中代码,点击调试,查询结果如下
- 查询数据条数,代码如下
select count(1) from baby_dataset;
选择代码后,点击调试。
可以看到控制台的结果在不断增大,达到500会暂停。这是因为默认查询500条,此时需要点击左侧的绿色箭头,恢复查询。
最后结果如下,可以看到,与mysql中的对应表数据条数相同。
此时,在mysql中向baby_dataset表插入一条数据
insert into baby_dataset values (99999999,'20130101',1);
回到flink控制台,可以看到,计数结果也增加了。
点击红色按钮停止查询。然后查询刚才插入的数据。
SELECT * FROM `baby_dataset`
where user_id=99999999;
此时,在mysql将此条数据的生日进行更改
UPDATE baby_dataset SET birthday = '20140101' WHERE user_id = 99999999;
执行成功后,观察flink控制台的变化,发现数据也完成了更改。
四、指标计算
我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。查询orders_dataset和baby_dataset表的关联结果,代码如下:
SELECT o.*,
b.birthday,
b.gender
FROM orders_dataset /*+ OPTIONS('server-id'='123450-123452') */ o
LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123453-123455') */ as b
ON o.user_id = b.user_id;
选中代码,点击调试,结果如下
接下来,我们希望对原始数据按照 user_id 进行 JOIN,构成一张宽表。然后对宽表数据的订单时间取到月份进行分组 GROUP BY,并统计每个分组中订单的购买数量SUM和出生婴儿的数量COUNT。代码如下
SELECT
SUBSTRING(tmp1.`day` FROM 1 FOR 6) as year_mon,
SUM(tmp1.buy_mount) as buy_num,
COUNT(birthday) as baby_num
FROM(
SELECT o.*,
b.birthday,
b.gender
FROM orders_dataset /*+ OPTIONS('server-id'='123456-123457') */ o
LEFT JOIN baby_dataset /*+ OPTIONS('server-id'='123458-123459') */ as b
ON o.user_id = b.user_id
) tmp1
GROUP BY SUBSTRING(tmp1.`day` FROM 1 FOR 6);
选中代码,点击调试,结果如下
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。