目录
前言
Flink和Pulsar是当前大数据常用的组件,他们的优势和特点在此不在赘述。可参考Flink官网和Pulsar官网。
我使用的Flink版本为1.12,Pulsar版本为2.9.0。
此文章的背景为Canal采集MySQL中的binlog数据,写入Pulsar,由Flink解析Pulsar中的Json数据,写入到存储中。
一、确定写入pulsar中的数据结构
在pulsar的目录下执行以下命令:
查看最早的一条数据:
bin/pulsar-client consume --subscription-position Earliest persistent://public/default/yourtopicname -s "first-subscription"
查看最新的一条数据:
bin/pulsar-client consume persistent://public/default/yourtopicname -s "first-subscription"
二、分析Pulsar中的数据结构
将一步骤中数据的内容进行解析
可在https://www.json.cn/中进行结构化输出
输出结果:
data中为时间的数据,mysqlType为数据的类型。
{"data":[{"wp_web_page_sk":"60","wp_web_page_id":"AAAAAAAAKDAAAAAA","wp_rec_start_date":"2001-09-03","wp_rec_end_date":"0000-00-00","wp_creation_date_sk":"2450813","wp_access_date_sk":"2452566","wp_autogen_flag":"Y","wp_customer_sk":"80555","wp_url":"http://www.foo.com","wp_type":"welcome","wp_char_count":"6577","wp_link_count":"24","wp_image_count":"2","wp_max_ad_count":"3"}],"database":"tpcds_01","es":1656468147000,"id":152155,"isDdl":false,"mysqlType":{"wp_web_page_sk":"int(11)","wp_web_page_id":"char(16)","wp_rec_start_date":"date","wp_rec_end_date":"date","wp_creation_date_sk":"int(11)","wp_access_date_sk":"int(11)","wp_autogen_flag":"char(1)","wp_customer_sk":"int(11)","wp_url":"varchar(100)","wp_type":"char(50)","wp_char_count":"int(11)","wp_link_count":"int(11)","wp_image_count":"int(11)","wp_max_ad_count":"int(11)"},"old":null,"pkNames":["wp_web_page_sk"],"sql":"","sqlType":{"wp_web_page_sk":4,"wp_web_page_id":1,"wp_rec_start_date":91,"wp_rec_end_date":91,"wp_creation_date_sk":4,"wp_access_date_sk":4,"wp_autogen_flag":1,"wp_customer_sk":4,"wp_url":12,"wp_type":1,"wp_char_count":4,"wp_link_count":4,"wp_image_count":4,"wp_max_ad_count":4
},"table":"web_page","ts":1656468638479,"type":"INSERT"}
三、写FlinkSql解析Json数据
1、创建source端
要注意'data'的格式和写法,此过程的本质为行转列的过程。
scan.startup.sub-start-offset消费节点配置可参考
Git上的源码
,因为这块内容目前在Flink官网上还无法找到
。
CREATETABLE ods_tpcds_01_web_page_rt_source
(`data` ARRAY <ROW< wp_web_page_sk BIGINT,
wp_web_page_id STRING,
wp_rec_start_date STRING,
wp_rec_end_date STRING,
wp_creation_date_sk BIGINT,
wp_access_date_sk BIGINT,
wp_autogen_flag STRING,
wp_customer_sk BIGINT,
wp_url STRING,
wp_type STRING,
wp_char_count BIGINT,
wp_link_count BIGINT,
wp_image_count BIGINT,
wp_max_ad_count BIGINT>>,`database` STRING,`isDdl` STRING,`table` STRING,`type` STRING,`es`BIGINT,`ts`BIGINT)WITH('connector'='pulsar','topic'='persistent://public/default/tpcds_01_web_page','service-url'='pulsar://xx.xx.xx.xx:6650','admin-url'='http://xx.xx.xx.xx:8080','scan.startup.mode'='external-subscription','scan.startup.sub-name'='ods_tpcds_01_web_page_rt_v1','scan.startup.sub-start-offset'='earliest','format'='json');
2、创建sink端
sink.rolling-policy.rollover-interval、sink.rolling-policy.file-size等参数可参考官网解释,自行修改
官网地址
CREATETABLE ods_tpcds_01_web_page_rt_sink
(
wp_web_page_sk BIGINT,
wp_web_page_id STRING,
wp_rec_start_date STRING,
wp_rec_end_date STRING,
wp_creation_date_sk BIGINT,
wp_access_date_sk BIGINT,
wp_autogen_flag STRING,
wp_customer_sk BIGINT,
wp_url STRING,
wp_type STRING,
wp_char_count BIGINT,
wp_link_count BIGINT,
wp_image_count BIGINT,
wp_max_ad_count BIGINT,`database` STRING,`isDdl` STRING,`type` STRING,`table` STRING,`es`BIGINT,`ts`BIGINT,`pt` STRING
) PARTITIONED BY(pt)WITH('connector'='filesystem','path'='可以写入hdfs或者minion等存储系统中,例如:s3a://bucket1/test/ods_poc_tpcds_01_web_page_rt_v1或者hdfs://test/ods_poc_tpcds_01_web_page_rt_v1','sink.rolling-policy.rollover-interval'='1min','sink.rolling-policy.file-size'='128M','format'='parquet');
3、写入数据
INSERTINTO ods_tpcds_01_web_page_rt_sink
SELECT
wp_web_page_sk,
wp_web_page_id,
wp_rec_start_date,
wp_rec_end_date,
wp_creation_date_sk,
wp_access_date_sk,
wp_autogen_flag,
wp_customer_sk,
wp_url,
wp_type,
wp_char_count,
wp_link_count,
wp_image_count,
wp_max_ad_count,`database`,`isDdl`,`type`,`table`,`es`,`ts`,
FROM_UNIXTIME((`ts`/1000)+60*60*8,'yyyy-MM-dd')as pt
FROM ods_tpcds_01_web_page_rt_source
CROSSJOIN UNNEST(`data`)AS t(
wp_web_page_sk,
wp_web_page_id,
wp_rec_start_date,
wp_rec_end_date,
wp_creation_date_sk,
wp_access_date_sk,
wp_autogen_flag,
wp_customer_sk,
wp_url,
wp_type,
wp_char_count,
wp_link_count,
wp_image_count,
wp_max_ad_count
);
总结
FlinkSql解析Json的步骤基本相似,主要是对Pulsar中Json结构的分析和创建对应的Flink表。如有描述不当,烦请指正。
版权归原作者 酱紫很帅 所有, 如有侵权,请联系我们删除。