0


Hudi数据湖技术之数据中心案例实战

目录


1 案例架构

在这里插入图片描述

本案例基于Flink SQL 与Hudi整合,将MySQL数据库业务数据,实时采集存储到Hudi表中,使用Presto和Flink SQL分别进行离线查询分析和流式查询数据,最后报表存储到MySQL数据库,使用FineBI整合进行可视化展示。

在这里插入图片描述

1、MySQL数据库:
教育客户业务数据存储及离线实时分析报表结果存储,对接可视化FineBI工具展示。

2、Flink SQL 引擎
使用Flink SQL中CDC实时采集MySQL数据库表数据到Hudi表,此外基于Flink SQL Connector整合Hudi与MySQL,数据存储和查询。

3、Apache Hudi:数据湖框架
教育业务数据,最终存储到Hudi表(底层存储:HDFS分布式文件系统),统一管理数据文件,后期与Spark和Hive集成,进行业务指标分析。

4、Presto 分析引擎
一个Facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。
本案例中直接从Hudi表加载数据,其中依赖Hive MetaStore管理元数据。其中Presto可以集成多数据源,方便数据交互处理。

2 业务数据

本次案例实战业务数据,来源于实际的客户Customer产生业务数据(咨询、访问、报名、浏览等),存储在MySQL数据库:oldlu_nev,使用业务表:
在这里插入图片描述

启动MySQL数据库,命令行方式登录,先创建数据库,再创建表,最后导入数据。

[root@node1~]# mysql -uroot -p123456

CREATE DATABASE IF NOT EXISTS oldlu_nev;
USE oldlu_nev;

2.1 客户信息表

客户信息表:customer,创建表DDL语句:

CREATE TABLE IF NOT EXISTS oldlu_nev.customer (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `customer_relationship_id` int(11) DEFAULT NULL COMMENT '当前意向id',
  `create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',
  `name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '姓名',
  `idcard` varchar(24) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '身份证号',
  `birth_year` int(5) DEFAULT NULL COMMENT '出生年份',
  `gender` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT 'MAN' COMMENT '性别',
  `phone` varchar(24) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '手机号',
  `wechat` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '微信',
  `qq` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT 'qq号',
  `email` varchar(56) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '邮箱',
  `area` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '所在区域',
  `leave_school_date` date DEFAULT NULL COMMENT '离校时间',
  `graduation_date` date DEFAULT NULL COMMENT '毕业时间',
  `bxg_student_id` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '博学谷学员ID,可能未关联到,不存在',
  `creator` int(11) DEFAULT NULL COMMENT '创建人ID',
  `origin_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '数据来源',
  `origin_channel` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '来源渠道',
  `tenant` int(11) NOT NULL DEFAULT '0',
  `md_id` int(11) DEFAULT '0' COMMENT '中台id',
  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

预先导入客户信息数据至表中,使用命令:source

mysql> source /root/1-customer.sql ;

2.2 客户意向表

客户意向表:customer_relationship,创建表DDL语句:

CREATE TABLE IF NOT EXISTS oldlu_nev.customer_relationship(
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',
  `deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',
  `customer_id` int(11) NOT NULL DEFAULT '0' COMMENT '所属客户id',
  `first_id` int(11) DEFAULT NULL COMMENT '第一条客户关系id',
  `belonger` int(11) DEFAULT NULL COMMENT '归属人',
  `belonger_name` varchar(10) DEFAULT NULL COMMENT '归属人姓名',
  `initial_belonger` int(11) DEFAULT NULL COMMENT '初始归属人',
  `distribution_handler` int(11) DEFAULT NULL COMMENT '分配处理人',
  `business_scrm_department_id` int(11) DEFAULT '0' COMMENT '归属部门',
  `last_visit_time` datetime DEFAULT NULL COMMENT '最后回访时间',
  `next_visit_time` datetime DEFAULT NULL COMMENT '下次回访时间',
  `origin_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '数据来源',
  `oldlu_school_id` int(11) DEFAULT NULL COMMENT '校区Id',
  `oldlu_subject_id` int(11) DEFAULT NULL COMMENT '学科Id',
  `intention_study_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '意向学习方式',
  `anticipat_signup_date` date DEFAULT NULL COMMENT '预计报名时间',
  `level` varchar(8) DEFAULT NULL COMMENT '客户级别',
  `creator` int(11) DEFAULT NULL COMMENT '创建人',
  `current_creator` int(11) DEFAULT NULL COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',
  `creator_name` varchar(32) DEFAULT '' COMMENT '创建者姓名',
  `origin_channel` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '来源渠道',
  `comment` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '备注',
  `first_customer_clue_id` int(11) DEFAULT '0' COMMENT '第一条线索id',
  `last_customer_clue_id` int(11) DEFAULT '0' COMMENT '最后一条线索id',
  `process_state` varchar(32) DEFAULT NULL COMMENT '处理状态',
  `process_time` datetime DEFAULT NULL COMMENT '处理状态变动时间',
  `payment_state` varchar(32) DEFAULT NULL COMMENT '支付状态',
  `payment_time` datetime DEFAULT NULL COMMENT '支付状态变动时间',
  `signup_state` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '报名状态',
  `signup_time` datetime DEFAULT NULL COMMENT '报名时间',
  `notice_state` varchar(32) DEFAULT NULL COMMENT '通知状态',
  `notice_time` datetime DEFAULT NULL COMMENT '通知状态变动时间',
  `lock_state` bit(1) DEFAULT b'0' COMMENT '锁定状态',
  `lock_time` datetime DEFAULT NULL COMMENT '锁定状态修改时间',
  `oldlu_clazz_id` int(11) DEFAULT NULL COMMENT '所属ems班级id',
  `oldlu_clazz_time` datetime DEFAULT NULL COMMENT '报班时间',
  `payment_url` varchar(1024) DEFAULT '' COMMENT '付款链接',
  `payment_url_time` datetime DEFAULT NULL COMMENT '支付链接生成时间',
  `ems_student_id` int(11) DEFAULT NULL COMMENT 'ems的学生id',
  `delete_reason` varchar(64) DEFAULT NULL COMMENT '删除原因',
  `deleter` int(11) DEFAULT NULL COMMENT '删除人',
  `deleter_name` varchar(32) DEFAULT NULL COMMENT '删除人姓名',
  `delete_time` datetime DEFAULT NULL COMMENT '删除时间',
  `course_id` int(11) DEFAULT NULL COMMENT '课程ID',
  `course_name` varchar(64) DEFAULT NULL COMMENT '课程名称',
  `delete_comment` varchar(255) DEFAULT '' COMMENT '删除原因说明',
  `close_state` varchar(32) DEFAULT NULL COMMENT '关闭装填',
  `close_time` datetime DEFAULT NULL COMMENT '关闭状态变动时间',
  `appeal_id` int(11) DEFAULT NULL COMMENT '申诉id',
  `tenant` int(11) NOT NULL DEFAULT '0' COMMENT '租户',
  `total_fee` decimal(19,0) DEFAULT NULL COMMENT '报名费总金额',
  `belonged` int(11) DEFAULT NULL COMMENT '小周期归属人',
  `belonged_time` datetime DEFAULT NULL COMMENT '归属时间',
  `belonger_time` datetime DEFAULT NULL COMMENT '归属时间',
  `transfer` int(11) DEFAULT NULL COMMENT '转移人',
  `transfer_time` datetime DEFAULT NULL COMMENT '转移时间',
  `follow_type` int(4) DEFAULT '0' COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',
  `transfer_bxg_oa_account` varchar(64) DEFAULT NULL COMMENT '转移到博学谷归属人OA账号',
  `transfer_bxg_belonger_name` varchar(64) DEFAULT NULL COMMENT '转移到博学谷归属人OA姓名',
  PRIMARY KEY (`id`)) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

预先导入客户意向数据至表中,使用命令:source

mysql> source /root/2-customer_relationship.sql ;

2.3 客户线索表

客户线索表:customer_clue,创建表DDL语句:

CREATETABLEIFNOTEXISTS oldlu_nev.customer_clue(`id`int(11)NOTNULLAUTO_INCREMENT,`create_date_time`datetimeNOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',`update_date_time`timestampNOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPCOMMENT'最后更新时间',`deleted`bit(1)NOTNULLDEFAULT b'0'COMMENT'是否被删除(禁用)',`customer_id`int(11)DEFAULTNULLCOMMENT'客户id',`customer_relationship_id`int(11)DEFAULTNULLCOMMENT'客户关系id',`session_id`varchar(48)COLLATE utf8_bin DEFAULT''COMMENT'七陌会话id',`sid`varchar(48)COLLATE utf8_bin DEFAULT''COMMENT'访客id',`status`varchar(16)COLLATE utf8_bin DEFAULT''COMMENT'状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',`user`varchar(16)COLLATE utf8_bin DEFAULT''COMMENT'所属坐席',`create_time`datetimeDEFAULTNULLCOMMENT'七陌创建时间',`platform`varchar(16)COLLATE utf8_bin DEFAULT''COMMENT'平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',`s_name`varchar(32)COLLATE utf8_bin DEFAULT''COMMENT'用户名称',`seo_source`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'搜索来源',`seo_keywords`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'关键字',`ip`varchar(48)COLLATE utf8_bin DEFAULT''COMMENT'IP地址',`referrer`textCOLLATE utf8_bin COMMENT'上级来源页面',`from_url`textCOLLATE utf8_bin COMMENT'会话来源页面',`landing_page_url`textCOLLATE utf8_bin COMMENT'访客着陆页面',`url_title`varchar(1024)COLLATE utf8_bin DEFAULT''COMMENT'咨询页面title',`to_peer`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'所属技能组',`manual_time`datetimeDEFAULTNULLCOMMENT'人工开始时间',`begin_time`datetimeDEFAULTNULLCOMMENT'坐席领取时间 ',`reply_msg_count`int(11)DEFAULT'0'COMMENT'客服回复消息数',`total_msg_count`int(11)DEFAULT'0'COMMENT'消息总数',`msg_count`int(11)DEFAULT'0'COMMENT'客户发送消息数',`comment`varchar(1024)COLLATE utf8_bin DEFAULT''COMMENT'备注',`finish_reason`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'结束类型',`finish_user`varchar(32)COLLATE utf8_bin DEFAULT''COMMENT'结束坐席',`end_time`datetimeDEFAULTNULLCOMMENT'会话结束时间',`platform_description`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'客户平台信息',`browser_name`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'浏览器名称',`os_info`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'系统名称',`area`varchar(255)COLLATE utf8_bin DEFAULTNULLCOMMENT'区域',`country`varchar(16)COLLATE utf8_bin DEFAULT''COMMENT'所在国家',`province`varchar(16)COLLATE utf8_bin DEFAULT''COMMENT'省',`city`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'城市',`creator`int(11)DEFAULT'0'COMMENT'创建人',`name`varchar(64)COLLATE utf8_bin DEFAULT''COMMENT'客户姓名',`idcard`varchar(24)COLLATE utf8_bin DEFAULT''COMMENT'身份证号',`phone`varchar(24)COLLATE utf8_bin DEFAULT''COMMENT'手机号',`oldlu_school_id`int(11)DEFAULTNULLCOMMENT'校区Id',`oldlu_school`varchar(128)COLLATE utf8_bin DEFAULT''COMMENT'校区',`oldlu_subject_id`int(11)DEFAULTNULLCOMMENT'学科Id',`oldlu_subject`varchar(128)COLLATE utf8_bin DEFAULT''COMMENT'学科',`wechat`varchar(32)COLLATE utf8_bin DEFAULT''COMMENT'微信',`qq`varchar(32)COLLATE utf8_bin DEFAULT''COMMENT'qq号',`email`varchar(56)COLLATE utf8_bin DEFAULT''COMMENT'邮箱',`gender`varchar(8)COLLATE utf8_bin DEFAULT'MAN'COMMENT'性别',`level`varchar(8)COLLATE utf8_bin DEFAULTNULLCOMMENT'客户级别',`origin_type`varchar(32)COLLATE utf8_bin DEFAULT''COMMENT'数据来源渠道',`information_way`varchar(32)COLLATE utf8_bin DEFAULTNULLCOMMENT'资讯方式',`working_years`dateDEFAULTNULLCOMMENT'开始工作时间',`technical_directions`varchar(255)COLLATE utf8_bin DEFAULT''COMMENT'技术方向',`customer_state`varchar(32)COLLATE utf8_bin DEFAULT''COMMENT'当前客户状态',`valid`bit(1)DEFAULT b'0'COMMENT'该线索是否是网资有效线索',`anticipat_signup_date`dateDEFAULTNULLCOMMENT'预计报名时间',`clue_state`varchar(32)COLLATE utf8_bin DEFAULT'NOT_SUBMIT'COMMENT'线索状态',`scrm_department_id`int(11)DEFAULTNULLCOMMENT'SCRM内部部门id',`superior_url`textCOLLATE utf8_bin COMMENT'诸葛获取上级页面URL',`superior_source`varchar(1024)COLLATE utf8_bin DEFAULTNULLCOMMENT'诸葛获取上级页面URL标题',`landing_url`textCOLLATE utf8_bin COMMENT'诸葛获取着陆页面URL',`landing_source`varchar(1024)COLLATE utf8_bin DEFAULTNULLCOMMENT'诸葛获取着陆页面URL来源',`info_url`textCOLLATE utf8_bin COMMENT'诸葛获取留咨页URL',`info_source`varchar(255)COLLATE utf8_bin DEFAULTNULLCOMMENT'诸葛获取留咨页URL标题',`origin_channel`varchar(32)COLLATE utf8_bin DEFAULT''COMMENT'投放渠道',`course_id`int(32)DEFAULTNULL,`course_name`varchar(255)COLLATE utf8_bin DEFAULTNULL,`zhuge_session_id`varchar(500)COLLATE utf8_bin DEFAULTNULL,`is_repeat`int(4)NOTNULLDEFAULT'0'COMMENT'是否重复线索(手机号维度) 0:正常 1:重复',`tenant`int(11)NOTNULLDEFAULT'0'COMMENT'租户id',`activity_id`varchar(16)COLLATE utf8_bin DEFAULTNULLCOMMENT'活动id',`activity_name`varchar(64)COLLATE utf8_bin DEFAULTNULLCOMMENT'活动名称',`follow_type`int(4)DEFAULT'0'COMMENT'分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',`shunt_mode_id`int(11)DEFAULTNULLCOMMENT'匹配到的技能组id',`shunt_employee_group_id`int(11)DEFAULTNULLCOMMENT'所属分流员工组',PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8 COLLATE=utf8_bin;

预先导入客户线索表数据至表中,使用命令:source

mysql> source /root/3-customer_clue.sql;

2.4 线索申诉表

线索申诉表:customer_appeal,创建表DDL语句:

CREATETABLEIFNOTEXISTS oldlu_nev.customer_appeal
(
  id intauto_incrementprimarykeyCOMMENT'主键',
  customer_relationship_first_id intnotNULLCOMMENT'第一条客户关系id',
  employee_id intNULLCOMMENT'申诉人',
  employee_name varchar(64)NULLCOMMENT'申诉人姓名',
  employee_department_id intNULLCOMMENT'申诉人部门',
  employee_tdepart_id intNULLCOMMENT'申诉人所属部门',
  appeal_status int(1)notNULLCOMMENT'申诉状态,0:待稽核 1:无效 2:有效',
  audit_id intNULLCOMMENT'稽核人id',
  audit_name varchar(255)NULLCOMMENT'稽核人姓名',
  audit_department_id intNULLCOMMENT'稽核人所在部门',
  audit_department_name varchar(255)NULLCOMMENT'稽核人部门名称',
  audit_date_time datetimeNULLCOMMENT'稽核时间',
  create_date_time datetimeDEFAULTCURRENT_TIMESTAMPNULLCOMMENT'创建时间(申诉时间)',
  update_date_time timestampDEFAULTCURRENT_TIMESTAMPNULLONUPDATECURRENT_TIMESTAMPCOMMENT'更新时间',
  deleted bitDEFAULT b'0'notNULLCOMMENT'删除标志位',
  tenant intDEFAULT0notNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

预先导入线索申诉数据至表中,使用命令:source

mysql> source /root/4-customer_appeal.sql;

2.5 客户访问咨询记录表

客户访问咨询记录表:web_chat_ems,创建表DDL语句:

createtableIFNOTEXISTS oldlu_nev.web_chat_ems(
  id intauto_incrementprimarykeycomment'主键',
  create_date_time timestampnullcomment'数据创建时间',
  session_id varchar(48)default''notnullcomment'七陌sessionId',
  sid varchar(48)collate utf8_bin  default''notnullcomment'访客id',
  create_time datetimenullcomment'会话创建时间',
  seo_source varchar(255)collate utf8_bin default''nullcomment'搜索来源',
  seo_keywords varchar(512)collate utf8_bin default''nullcomment'关键字',
  ip varchar(48)collate utf8_bin  default''nullcomment'IP地址',
  area varchar(255)collate utf8_bin default''nullcomment'地域',
  country varchar(16)collate utf8_bin  default''nullcomment'所在国家',
  province varchar(16)collate utf8_bin  default''nullcomment'省',
  city varchar(255)collate utf8_bin default''nullcomment'城市',
  origin_channel varchar(32)collate utf8_bin  default''nullcomment'投放渠道',uservarchar(255)collate utf8_bin default''nullcomment'所属坐席',
  manual_time datetimenullcomment'人工开始时间',
  begin_time datetimenullcomment'坐席领取时间 ',
  end_time datetimenullcomment'会话结束时间',
  last_customer_msg_time_stamp datetimenullcomment'客户最后一条消息的时间',
  last_agent_msg_time_stamp datetimenullcomment'坐席最后一下回复的时间',
  reply_msg_count int(12)default0nullcomment'客服回复消息数',
  msg_count int(12)default0nullcomment'客户发送消息数',
  browser_name varchar(255)collate utf8_bin default''nullcomment'浏览器名称',
  os_info varchar(255)collate utf8_bin default''nullcomment'系统名称');

预先导入访问咨询记录至表中,使用命令:source

mysql> source /root/5-web_chat_ems.sql;

3 Flink CDC 实时数据采集

Flink 1.11 引入了 Flink SQL CDC,方便将RDBMS表数据,实时采集到存储系统,比如Hudi表等,其中MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。

在这里插入图片描述

3.1 开启MySQL binlog

MySQL CDC,需要首先开启MySQL数据库binlog日志,再重启MySQL数据库服务。
第一步、开启MySQL binlog日志

[root@node1~]# vim /etc/my.cnf 
在[mysqld]下面添加内容:
server-id=2
log-bin=mysql-bin
binlog_format=row
expire_logs_days=15
binlog_row_image=full

在这里插入图片描述

第二步、重启MySQL Server

service mysqld restart

登录MySQL Client命令行,查看是否生效。

在这里插入图片描述

第三步、下载Flink CDC MySQL Jar包
由于使用Flink 1.12.2版本,目前支持Flink CDC 版本:1.3.0,添加maven 依赖:

<!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc --><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.3.0</version></dependency>

如果使用Flink SQL Client,需要将jar包放到 $FLINK_HOME/lib 目录中:

在这里插入图片描述

3.2 环境准备

实时数据采集,既可以编写Java程序,又可以直接运行DDL语句。
方式一:启动Flink SQL Client,执行编写DDL语句,Flink Job提交到Standalone集群
– 启动HDFS服务

hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode

– 启动Flink Standalone集群

export HADOOP_CLASSPATH=

/export/server/hadoop/bin/hadoop classpath

/export/server/flink/bin/start-cluster.sh

– 启动SQL Client

/export/server/flink/bin/sql-client.sh embedded
-j /export/server/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell

– 设置属性

set execution.result-mode=tableau; set
execution.checkpointing.interval=3sec;
SET execution.runtime-mode =streaming;

方式二:使用IDEA创建Maven工程,添加相关依赖,编写程序,执行DDL语句。
依赖pom.xml添内容如下:

<repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</url></repository><repository><id>central_maven</id><name>central maven</name><url>https://repo1.maven.org/maven2</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><flink.version>1.12.2</flink.version><hadoop.version>2.7.3</hadoop.version><mysql.version>8.0.16</mysql.version></properties><dependencies><!-- Flink Client --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId><version>0.9.0</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><!-- MySQL--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- slf4j及log4j --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformers></configuration></execution></executions></plugin></plugins></build>

编写程序,实现数据实时采集同步,主要三个步骤:输入表InputTable、输出表outputTable,查询插入INSERT…SELECT语句,示意图如下:
在这里插入图片描述

本次案例,为了更加只管看到效果,启动Flink SQL Client客户端,编写DDL和DML语句,直接执行。

3.3 实时采集数据

基于Flink CDC 实时采集数据,需要创建输入Input和输出Output两张表,再编写INSERT…SELECT 插入查询语句。
在这里插入图片描述

接下来将MySQL数据库5张业务数据表数据,实时采集同步到Hudi表中(存储HDFS文件系统)。

3.3.1 客户信息表

同步客户信息表【customer】数据到Hudi表中,按照上述步骤编写DDL和DML语句并执行。
第一步、输入表InputTable

createtable tbl_customer_mysql (
  id STRING PRIMARYKEYNOT ENFORCED,
  customer_relationship_id STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  name STRING,
  idcard STRING,
  birth_year STRING,
  gender STRING,
  phone STRING,
  wechat STRING,
  qq STRING,
  email STRING,
  area STRING,
  leave_school_date STRING,
  graduation_date STRING,
  bxg_student_id STRING,
  creator STRING,
  origin_type STRING,
  origin_channel STRING,
  tenant STRING,
  md_id STRING
)WITH('connector'='mysql-cdc','hostname'='node1.oldlu.cn','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'='oldlu_nev','table-name'='customer');

第二步、输出表OutputTable

CREATETABLE edu_customer_hudi(
  id STRING PRIMARYKEYNOT ENFORCED,
  customer_relationship_id STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  name STRING,
  idcard STRING,
  birth_year STRING,
  gender STRING,
  phone STRING,
  wechat STRING,
  qq STRING,
  email STRING,
  area STRING,
  leave_school_date STRING,
  graduation_date STRING,
  bxg_student_id STRING,
  creator STRING,
  origin_type STRING,
  origin_channel STRING,
  tenant STRING,
  md_id STRING,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/ehualu/hudi-warehouse/edu_customer_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','write.tasks'='1','read.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true');

第三步、插入查询语句

insertinto edu_customer_hudi 
select*, CAST(CURRENT_DATEAS STRING)AS part from tbl_customer_mysql;

此时生成Flink job,提交到Standalone集群运行,首先将表中历史数据同步到Hudi表,再实时同步增量数据。
在这里插入图片描述

3.3.2 客户意向表

同步客户意向表【customer_relationship】数据到Hudi表中,按照上述步骤编写DDL和DML语句并执行。
第一步、输入表InputTable

createtable tbl_customer_relationship_mysql (
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  oldlu_school_id string,
  oldlu_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,`level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,`comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  oldlu_clazz_id string,
  oldlu_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string
)WITH('connector'='mysql-cdc','hostname'='node1.oldlu.cn','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'='oldlu_nev','table-name'='customer_relationship');

第二步、输出表OutputTable

createtable edu_customer_relationship_hudi(
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  oldlu_school_id string,
  oldlu_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,`level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,`comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  oldlu_clazz_id string,
  oldlu_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/ehualu/hudi-warehouse/edu_customer_relationship_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','write.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true');

第三步、插入查询语句

insertinto edu_customer_relationship_hudi 
select*, CAST(CURRENT_DATEAS STRING)AS part from tbl_customer_relationship_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

3.3.3 客户线索表

同步客户线索表【customer_clue】数据到Hudi表,按照上述步骤编写DDL和DML语句并执行。
第一步、输入表InputTable

createtable tbl_customer_clue_mysql (
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,status string,`user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,`comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  oldlu_school_id string,
  oldlu_school string,
  oldlu_subject_id string,
  oldlu_subject string,
  wechat string,
  qq string,
  email string,
  gender string,`level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string
)WITH('connector'='mysql-cdc','hostname'='node1.oldlu.cn','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'='oldlu_nev','table-name'='customer_clue');

第二步、输出表OutputTable

createtable edu_customer_clue_hudi (
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,status string,`user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,`comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  oldlu_school_id string,
  oldlu_school string,
  oldlu_subject_id string,
  oldlu_subject string,
  wechat string,
  qq string,
  email string,
  gender string,`level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/ehualu/hudi-warehouse/edu_customer_clue_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','write.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true');

第三步、插入查询语句

insertinto edu_customer_clue_hudi 
select*, CAST(CURRENT_DATEAS STRING)AS part from tbl_customer_clue_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

3.3.4 客户申诉表

同步客户申诉表【customer_appeal】数据到Hudi表,按照上述步骤编写DDL和DML语句执行。
第一步、输入表InputTable

createtable tbl_customer_appeal_mysql (
  id string PRIMARYKEYNOT ENFORCED,
  customer_relationship_first_id string,
  employee_id string,
  employee_name string,
  employee_department_id string,
  employee_tdepart_id string,
  appeal_status string,
  audit_id string,
  audit_name string,
  audit_department_id string,
  audit_department_name string,
  audit_date_time string,
  create_date_time string,
  update_date_time string,
  deleted string,
  tenant string
)WITH('connector'='mysql-cdc','hostname'='node1.oldlu.cn','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'='oldlu_nev','table-name'='customer_appeal');

第二步、输出表OutputTable

createtable edu_customer_appeal_hudi (
  id string PRIMARYKEYNOT ENFORCED,
  customer_relationship_first_id STRING,
  employee_id STRING,
  employee_name STRING,
  employee_department_id STRING,
  employee_tdepart_id STRING,
  appeal_status STRING,
  audit_id STRING,
  audit_name STRING,
  audit_department_id STRING,
  audit_department_name STRING,
  audit_date_time STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  tenant STRING,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/ehualu/hudi-warehouse/edu_customer_appeal_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','write.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true');

第三步、插入查询语句

insertinto edu_customer_appeal_hudi 
select*, CAST(CURRENT_DATEAS STRING)AS part from tbl_customer_appeal_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

3.3.5 客户访问咨询记录表

同步客服访问咨询记录表【web_chat_ems】数据到Hudi表中,按照上述步骤编写DDL和DML语句并执行。
第一步、输入表InputTable

createtable tbl_web_chat_ems_mysql (
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,`user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string
)WITH('connector'='mysql-cdc','hostname'='node1.oldlu.cn','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'='oldlu_nev','table-name'='web_chat_ems');

第二步、输出表OutputTable

createtable edu_web_chat_ems_hudi (
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,`user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/ehualu/hudi-warehouse/edu_web_chat_ems_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','write.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true');

第三步、插入查询语句

insertinto edu_web_chat_ems_hudi 
select*, CAST(CURRENT_DATEAS STRING)AS part from tbl_web_chat_ems_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

采集同步到Hudi表中,此时5个Flink job依然在Standalone集群上运行,如果各个表中有业务数据产生,同样实时获取,存储到Hudi表中
在这里插入图片描述

4 Presto 即席分析

使用Presto 分析Hudi表数据,最终将结果直接存储到MySQL数据库表中,示意图如下。
在这里插入图片描述

第一、Hive 中创建表,关联Hudi表
第二、Presto集成Hive,加载Hive表数据
第三、Presto集成MySQL,读取或者保存数据

4.1 Presto 是什么

Presto是一款Facebook开源的MPP架构的OLAP查询引擎,可针对不同数据源执行大容量数据集的一款分布式SQL执行引擎。适用于交互式分析查询,数据量支持GB到PB字节。
1、清晰的架构,是一个能够独立运行的系统,不依赖于任何其他外部系统。例如调度,presto自身提供了对集群的监控,可以根据监控信息完成调度。
2、简单的数据结构,列式存储,逻辑行,大部分数据都可以轻易的转化成presto所需要的这种数据结构。
3、丰富的插件接口,完美对接外部存储系统,或者添加自定义的函数。
在这里插入图片描述

Presto采用典型的master-slave模型,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,Discovery Server通常内嵌于Coordinator节点中。
在这里插入图片描述

1、coordinator(master)负责meta管理,worker管理,query的解析和调度
2、worker则负责计算和读写
3、discovery server, 通常内嵌于coordinator节点中,也可以单独部署,用于节点心跳。在下文中,默认discovery和coordinator共享一台机器。
Presto 数据模型:采取三层表结构
在这里插入图片描述

1、catalog 对应某一类数据源,例如hive的数据,或mysql的数据
2、schema 对应mysql中的数据库
3、table 对应mysql中的表

4.2 Presto 安装部署

采用单节点部署安装Presto,服务器名称:node1.oldlu.cn,IP地址:192.168.88.100。
1、JDK8安装

java -version

在这里插入图片描述

2、上传解压Presto安装包
创建安装目录

mkdir -p /export/server

yum安装上传文件插件lrzsz

yum install -y lrzsz

上传安装包到node1的/export/server目录

presto-server-0.245.1.tar.gz

解压、重命名

tar -xzvf presto-server-0.245.1.tar.gz -C /export/server
ln -s presto-server-0.245.1 presto

创建配置文件存储目录

mkdir -p /export/server/presto/etc

3、配置presto

etc/config.properties
vim /export/server/presto/etc/config.properties
内容:
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
query.max-memory=6GB
query.max-memory-per-node=2GB
query.max-total-memory-per-node=2GB
discovery-server.enabled=true
discovery.uri=http://192.168.88.100:8090

etc/jvm.config

vim /export/server/presto/etc/jvm.config
内容:
-server
-Xmx3G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError

etc/node.properties

vim /export/server/presto/etc/node.properties
内容:
node.environment=hudipresto
node.id=presto-node1
node.data-dir=/export/server/presto/data

etc/catalog/hive.properties
mkdir -p /export/server/presto/etc/catalog
vim /export/server/presto/etc/catalog/hive.properties
内容:
connector.name=hive-hadoop2
hive.metastore.uri=thrift://192.168.88.100:9083
hive.parquet.use-column-names=true
hive.config.resources=/export/server/presto/etc/catalog/core-site.xml,/export/server/presto/etc/catalog/hdfs-site.xml

etc/catalog/mysql.properties
vim /export/server/presto/etc/catalog/mysql.properties
内容:
connector.name=mysql
connection-url=jdbc:mysql://node1.oldlu.cn:3306
connection-user=root
connection-password=123456

4、启动服务
进入Presto安装目录,执行 $PRESTO_HOME/bin中脚本

/export/server/presto/bin/launcher start

使用jps查看进程是否存在,进程名称:PrestoServer。
在这里插入图片描述
此外WEB UI界面:

http://192.168.88.100:8090/ui/

在这里插入图片描述

Presto CLI命令行客户端
下载CLI客户端

presto-cli-0.241-executable.jar

上传presto-cli-0.245.1-executable.jar到/export/server/presto/bin

mv presto-cli-0.245.1-executable.jar presto
chmod +x presto

CLI客户端启动

/export/server/presto/bin/presto --server 192.168.88.100:8090

在这里插入图片描述

4.3 Hive 创建表

为了让Presto分析Hudi表中数据,需要将Hudi表映射关联到Hive表中。接下来,再Hive中创建5张教育客户业务数据表,映射关联到Hudi表。
在这里插入图片描述

启动HDFS服务、HiveMetaStore和HiveServer服务,运行Beeline命令行:

-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode

-- Hive服务/export/server/hive/bin/start-metastore.sh 
/export/server/hive/bin/start-hiveserver2.sh

-- 启动Beeline客户端/export/server/hive/bin/beeline -u jdbc:hive2://node1.oldlu.cn:10000 -n root -p 123456

设置Hive本地模式,方便测试使用:

-- 设置Hive本地模式set hive.exec.mode.local.auto=true;set hive.exec.mode.local.auto.tasks.max=10;set hive.exec.mode.local.auto.inputbytes.max=50000000;

4.3.1 创建数据库

-- 创建数据库CREATEDATABASEIFNOTEXISTS edu_hudi ;-- 使用数据库USE edu_hudi ;

4.3.2 客户信息表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer(
  id string,
  customer_relationship_id string,
  create_date_time string,
  update_date_time string,
  deleted string,
  name string,
  idcard string,
  birth_year string,
  gender string,
  phone string,
  wechat string,
  qq string,
  email string,
  area string,
  leave_school_date string,
  graduation_date string,
  bxg_student_id string,
  creator string,
  origin_type string,
  origin_channel string,
  tenant string,
  md_id string
)PARTITIONED BY(day_str string)ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/ehualu/hudi-warehouse/edu_customer_hudi';
由于是分区表,所以添加分区:
ALTERTABLE edu_hudi.tbl_customer ADDIFNOTEXISTSPARTITION(day_str='2022-09-23') 
location '/ehualu/hudi-warehouse/edu_customer_hudi/2022-09-23';

4.3.3 客户意向表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_relationship(
  id string,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  oldlu_school_id string,
  oldlu_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,`level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,`comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  oldlu_clazz_id string,
  oldlu_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string
)PARTITIONED BY(day_str string)ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/ehualu/hudi-warehouse/edu_customer_relationship_hudi';
由于是分区表,所以添加分区:
ALTERTABLE edu_hudi.tbl_customer_relationship ADDIFNOTEXISTSPARTITION(day_str='2022-09-23') 
location '/ehualu/hudi-warehouse/edu_customer_relationship_hudi/2022-09-23';

4.3.4 客户线索表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_clue(
  id string,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,status string,`user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,`comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  oldlu_school_id string,
  oldlu_school string,
  oldlu_subject_id string,
  oldlu_subject string,
  wechat string,
  qq string,
  email string,
  gender string,`level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string
)
PARTITIONED BY(day_str string)ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/ehualu/hudi-warehouse/edu_customer_clue_hudi';
由于是分区表,所以添加分区:
ALTERTABLE edu_hudi.tbl_customer_clue ADDIFNOTEXISTSPARTITION(day_str='2022-09-23') 
location '/ehualu/hudi-warehouse/edu_customer_clue_hudi/2022-09-23';

4.3.5 客户申诉表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_appeal(
  id string,
  customer_relationship_first_id STRING,
  employee_id STRING,
  employee_name STRING,
  employee_department_id STRING,
  employee_tdepart_id STRING,
  appeal_status STRING,
  audit_id STRING,
  audit_name STRING,
  audit_department_id STRING,
  audit_department_name STRING,
  audit_date_time STRING,
  create_date_time STRING,
  update_date_time STRING,
  deleted STRING,
  tenant STRING
)
PARTITIONED BY(day_str string)ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/ehualu/hudi-warehouse/edu_customer_appeal_hudi';
由于是分区表,所以添加分区:
ALTERTABLE edu_hudi.tbl_customer_appeal ADDIFNOTEXISTSPARTITION(day_str='2022-09-23') 
location '/ehualu/hudi-warehouse/edu_customer_appeal_hudi/2022-09-23';

4.3.6 客户访问咨询记录表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_web_chat_ems (
  id string,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,`user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string
)
PARTITIONED BY(day_str string)ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 
  '/ehualu/hudi-warehouse/edu_web_chat_ems_hudi';
由于是分区表,所以添加分区:
ALTERTABLE edu_hudi.tbl_web_chat_ems ADDIFNOTEXISTSPARTITION(day_str='2022-09-23') 
location '/ehualu/hudi-warehouse/edu_web_chat_ems_hudi/2022-09-23';

4.4 离线指标分析

使用Presto分析Hudi表数据,需要将集成jar包:hudi-presto-bundle-0.9.0.jar,放入到Presto插件目录:/export/server/presto/plugin/hive-hadoop2中:
在这里插入图片描述

启动Presto Client 客户端命令行,查看Hive中创建数据库:
在这里插入图片描述

使用数据库:edu_hudi,查看有哪些表:
在这里插入图片描述

接下来,按照业务指标需求,使用Presto,分析Hudi表数据,将指标直接保存MySQL数据库。
在这里插入图片描述

首先在MySQL数据库中,创建database,专门存储分析指标表:

-- 创建数据库CREATEDATABASE`oldlu_rpt`/*!40100 DEFAULT CHARACTER SET utf8 */;

4.4.1 每日报名量

对客户意向表数据统计分析:每日客户报名量,先创建MySQL表,再编写SQL,最后保存数据。
MySQL表:oldlu_rpt.stu_apply

CREATETABLEIFNOTEXISTS`oldlu_rpt`.`stu_apply`(`report_date`longtext,`report_total`bigint(20)NOTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标SQL语句:

WITH tmp AS(SELECT 
    format_datetime(from_unixtime(cast(payment_time asbigint)/1000),'yyyy-MM-dd')AS day_value, customer_id 
  FROM hive.edu_hudi.tbl_customer_relationship 
  WHERE 
    day_str ='2022-09-23'AND payment_time ISNOTNULLAND payment_state ='PAID'AND deleted ='false')SELECT day_value,COUNT(customer_id)AS total FROM tmp GROUPBY day_value ;

分析结果保存MySQL表:

INSERTINTO mysql.oldlu_rpt.stu_apply (report_date, report_total)SELECT day_value, total FROM(SELECT day_value,COUNT(customer_id)AS total FROM(SELECT 
      format_datetime(from_unixtime(cast(payment_time asbigint)/1000),'yyyy-MM-dd')AS day_value, customer_id 
    FROM hive.edu_hudi.tbl_customer_relationship 
    WHERE day_str ='2022-09-23'AND payment_time ISNOTNULLAND payment_state ='PAID'AND deleted ='false')GROUPBY day_value
);

查看数据库表中数据:
在这里插入图片描述

4.4.2 每日访问量

对客户意向表数据统计分析:每日客户访问量,先创建MySQL表,再编写SQL,最后保存数据。
MySQL表:oldlu_rpt.web_pv

CREATETABLEIFNOTEXISTS`oldlu_rpt`.`web_pv`(`report_date`longtext,`report_total`bigint(20)NOTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标SQL语句:
WITH tmp AS(SELECT 
    id, format_datetime(from_unixtime(cast(create_time asbigint)/1000),'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_web_chat_ems 
  WHERE day_str ='2022-09-23')SELECT day_value,COUNT(id)AS total FROM tmp GROUPBY day_value ;

分析结果保存MySQL表:

INSERTINTO mysql.oldlu_rpt.web_pv (report_date, report_total)SELECT day_value,COUNT(id)AS total FROM(SELECT 
    id, format_datetime(from_unixtime(cast(create_time asbigint)/1000),'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_web_chat_ems 
  WHERE day_str ='2022-09-23')GROUPBY day_value ;

查看数据库表中数据:
在这里插入图片描述

4.4.3 每日意向数

对客户意向表数据统计分析:每日客户意向数,先创建MySQL表,再编写SQL,最后保存数据。
MySQL表:oldlu_rpt.stu_intention

CREATETABLEIFNOTEXISTS`oldlu_rpt`.`stu_intention`(`report_date`longtext,`report_total`bigint(20)NOTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标SQL语句:

WITH tmp AS(SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time asbigint)/1000),'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_relationship 
  WHERE day_str ='2022-09-23'AND create_date_time ISNOTNULLAND deleted ='false')SELECT day_value,COUNT(id)AS total FROM tmp GROUPBY day_value ;

分析结果保存MySQL表:

INSERTINTO mysql.oldlu_rpt.stu_intention (report_date, report_total)SELECT day_value,COUNT(id)AS total FROM(SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time asbigint)/1000),'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_relationship 
  WHERE day_str ='2022-09-23'AND create_date_time ISNOTNULLAND deleted ='false')GROUPBY day_value ;

查看数据库表中数据:
在这里插入图片描述

4.4.4 每日线索量

对客户意向表数据统计分析:每日客户线索量,先创建MySQL表,再编写SQL,最后保存数据。
MySQL表:oldlu_rpt.stu_clue

CREATETABLEIFNOTEXISTS`oldlu_rpt`.`stu_clue`(`report_date`longtext,`report_total`bigint(20)NOTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标SQL语句:

WITH tmp AS(SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time asbigint)/1000),'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_clue 
  WHERE day_str ='2022-09-23'AND clue_state ISNOTNULLAND deleted ='false')SELECT day_value,COUNT(id)AS total FROM tmp GROUPBY day_value ;

分析结果保存MySQL表:

INSERTINTO mysql.oldlu_rpt.stu_clue (report_date, report_total)SELECT day_value,COUNT(id)AS total FROM(SELECT 
    id, format_datetime(from_unixtime(cast(create_date_time asbigint)/1000),'yyyy-MM-dd')AS day_value
  FROM hive.edu_hudi.tbl_customer_clue 
  WHERE day_str ='2022-09-23'AND clue_state ISNOTNULLAND deleted ='false')GROUPBY day_value ;

查看数据库表中数据:
在这里插入图片描述

5 Flink SQL 流式分析

使用Flink SQL流式查询Hudi表今日实时数据,统计离线指标对应今日实时指标,最后使用FineBI实时大屏展示。
在这里插入图片描述

基于Flink SQL Connector与Hudi和MySQL集成,编写SQL流式查询分析,在SQL Clientk客户端命令行执行DDL语句和SELECT语句。

5.1 业务需求

在这里插入图片描述

总共有5个指标,涉及到3张业务表:客户访问记录表、客户线索表和客户意向表,其中每个指标实时数据存储到MySQL数据库中一张表。
在这里插入图片描述

每个实时指标统计,分为三个步骤:
第1步、创建输入表,流式加载Hudi表数据;
第2步、创建输出表,实时保存数据至MySQL表;
第3步、依据业务,编写SQL语句,查询输入表数据,并将结果插入输出表;
在这里插入图片描述

5.2 创建MySQL表

每个实时指标存储到MySQL数据库一张表,首先创建5个指标对应的5张表,名称不一样,字段一样,DDL语句如下:
指标1:今日访问量

CREATETABLE`oldlu_rpt`.`realtime_web_pv`(`report_date`varchar(255)NOTNULL,`report_total`bigint(20)NOTNULL,PRIMARYKEY(`report_date`))ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标2:今日咨询量

CREATETABLE`oldlu_rpt`.`realtime_stu_consult`(`report_date`varchar(255)NOTNULL,`report_total`bigint(20)NOTNULL,PRIMARYKEY(`report_date`))ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标3:今日意向数

CREATETABLE`oldlu_rpt`.`realtime_stu_intention`(`report_date`varchar(255)NOTNULL,`report_total`bigint(20)NOTNULL,PRIMARYKEY(`report_date`))ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标4:今日报名人数

CREATETABLE`oldlu_rpt`.`realtime_stu_apply`(`report_date`varchar(255)NOTNULL,`report_total`bigint(20)NOTNULL,PRIMARYKEY(`report_date`))ENGINE=InnoDBDEFAULTCHARSET=utf8;

指标5:今日有效线索量

CREATETABLE`oldlu_rpt`.`realtime_stu_clue`(`report_date`varchar(255)NOTNULL,`report_total`bigint(20)NOTNULL,PRIMARYKEY(`report_date`))ENGINE=InnoDBDEFAULTCHARSET=utf8;

5.3 实时指标分析

在这里插入图片描述

1、今日访问量和今日咨询量,流式加载表:edu_web_chat_ems_hudi 数据
在这里插入图片描述

今日意向数和今日报名人数,流式加载表:edu_customer_relationship_hudi 数据
在这里插入图片描述

3、今日有效线索量,流式加载表:edu_customer_clue_hudi 数据
在这里插入图片描述

启动HDFS服务和Standalone集群,运行SQL Client客户端,设置属性:

-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode

-- 启动Flink Standalone集群
export HADOOP_CLASSPATH=`/export/server/hadoop/bin/hadoop classpath`/export/server/flink/bin/start-cluster.sh

-- 启动SQL Client/export/server/flink/bin/sql-client.sh embedded \
-j /export/server/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell

-- 设置属性set execution.result-mode=tableau;set execution.checkpointing.interval=3sec;-- 流处理模式SET execution.runtime-mode= streaming;

5.3.1 今日访问量

在这里插入图片描述

首先创建输入表:流式加载,Hudi表数据:

CREATETABLE edu_web_chat_ems_hudi (
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  session_id string,
  sid string,
  create_time string,
  seo_source string,
  seo_keywords string,
  ip string,
  area string,
  country string,
  province string,
  city string,
  origin_channel string,`user` string,
  manual_time string,
  begin_time string,
  end_time string,
  last_customer_msg_time_stamp string,
  last_agent_msg_time_stamp string,
  reply_msg_count string,
  msg_count string,
  browser_name string,
  os_info string,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/ehualu/hudi-warehouse/edu_web_chat_ems_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','read.streaming.enabled'='true','read.streaming.check-interval'='5','read.tasks'='1');

统计结果,存储至视图View:

CREATEVIEWIFNOTEXISTS view_tmp_web_pv ASSELECT day_value,COUNT(id)AS total FROM(SELECT
    FROM_UNIXTIME(CAST(create_time ASBIGINT)/1000,'yyyy-MM-dd')AS day_value, id
  FROM edu_web_chat_ems_hudi
  WHERE part = CAST(CURRENT_DATEAS STRING))GROUPBY  day_value;

保存MySQL数据库:
– SQL Connector MySQL

CREATETABLE realtime_web_pv_mysql (
  report_date STRING,
  report_total BIGINT,PRIMARYKEY(report_date)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://node1.oldlu.cn:3306/oldlu_rpt','driver'='com.mysql.cj.jdbc.Driver','username'='root','password'='123456','table-name'='realtime_web_pv');

– INSERT INTO 插入

INSERTINTO  realtime_web_pv_mysql SELECT day_value, total FROM view_tmp_web_pv;

5.3.2 今日咨询量

在这里插入图片描述

由于今日访问量与今日咨询量,都是查询Hudi中表:edu_web_chat_emes_hudi,所以前面流式加载增量加载数据以后,此处就不需要。
统计结果,存储至视图View:

CREATEVIEWIFNOTEXISTS view_tmp_stu_consult ASSELECT day_value,COUNT(id)AS total FROM(SELECT
    FROM_UNIXTIME(CAST(create_time ASBIGINT)/1000,'yyyy-MM-dd')AS day_value, id
  FROM edu_web_chat_ems_hudi
  WHERE part = CAST(CURRENT_DATEAS STRING)AND msg_count >0)GROUPBY  day_value;

保存MySQL数据库:
– SQL Connector MySQL

CREATETABLE realtime_stu_consult_mysql (
  report_date STRING,
  report_total BIGINT,PRIMARYKEY(report_date)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://node1.oldlu.cn:3306/oldlu_rpt','driver'='com.mysql.cj.jdbc.Driver','username'='root','password'='123456','table-name'='realtime_stu_consult');

– INSERT INTO 插入

INSERTINTO  realtime_stu_consult_mysql SELECT day_value, total FROM view_tmp_stu_consult;

5.3.3 今日意向数

在这里插入图片描述

首先创建输入表:流式加载,Hudi表数据:

createtable edu_customer_relationship_hudi(
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  first_id string,
  belonger string,
  belonger_name string,
  initial_belonger string,
  distribution_handler string,
  business_scrm_department_id string,
  last_visit_time string,
  next_visit_time string,
  origin_type string,
  oldlu_school_id string,
  oldlu_subject_id string,
  intention_study_type string,
  anticipat_signup_date string,`level` string,
  creator string,
  current_creator string,
  creator_name string,
  origin_channel string,`comment` string,
  first_customer_clue_id string,
  last_customer_clue_id string,
  process_state string,
  process_time string,
  payment_state string,
  payment_time string,
  signup_state string,
  signup_time string,
  notice_state string,
  notice_time string,
  lock_state string,
  lock_time string,
  oldlu_clazz_id string,
  oldlu_clazz_time string,
  payment_url string,
  payment_url_time string,
  ems_student_id string,
  delete_reason string,
  deleter string,
  deleter_name string,
  delete_time string,
  course_id string,
  course_name string,
  delete_comment string,
  close_state string,
  close_time string,
  appeal_id string,
  tenant string,
  total_fee string,
  belonged string,
  belonged_time string,
  belonger_time string,
  transfer string,
  transfer_time string,
  follow_type string,
  transfer_bxg_oa_account string,
  transfer_bxg_belonger_name string,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/hudi-warehouse/edu_customer_relationship_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','read.streaming.enabled'='true','read.streaming.check-interval'='5','read.tasks'='1');

统计结果,存储至视图View:

CREATEVIEWIFNOTEXISTS view_tmp_stu_intention ASSELECT day_value,COUNT(id)AS total FROM(SELECT
    FROM_UNIXTIME(CAST(create_date_time ASBIGINT)/1000,'yyyy-MM-dd')AS day_value, id
  FROM edu_customer_relationship_hudi
  WHERE part = CAST(CURRENT_DATEAS STRING)AND create_date_time ISNOTNULLAND deleted ='false')GROUPBY  day_value;
保存MySQL数据库:
-- SQL Connector MySQLCREATETABLE realtime_stu_intention_mysql (
  report_date STRING,
  report_total BIGINT,PRIMARYKEY(report_date)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://node1.oldlu.cn:3306/oldlu_rpt','driver'='com.mysql.cj.jdbc.Driver','username'='root','password'='123456','table-name'='realtime_stu_intention');

– INSERT INTO 插入

INSERTINTO  realtime_stu_intention_mysql SELECT day_value, total 
FROM view_tmp_stu_intention;

5.3.4 今日报名人数

在这里插入图片描述

由于今日意向量与今日报名人数,都是查询Hudi中表:edu_customer_relationship_hudi,所以前面流式加载增量加载数据以后,此处就不需要。
统计结果,存储至视图View:

CREATEVIEWIFNOTEXISTS view_tmp_stu_apply ASSELECT day_value,COUNT(id)AS total FROM(SELECT
    FROM_UNIXTIME(CAST(payment_time ASBIGINT)/1000,'yyyy-MM-dd')AS day_value, id
  FROM edu_customer_relationship_hudi
  WHERE part = CAST(CURRENT_DATEAS STRING)AND payment_time ISNOTNULLAND payment_state ='PAID'AND deleted ='false')GROUPBY  day_value;

保存MySQL数据库:
– SQL Connector MySQL

CREATETABLE realtime_stu_apply_mysql (
  report_date STRING,
  report_total BIGINT,PRIMARYKEY(report_date)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://node1.oldlu.cn:3306/oldlu_rpt','driver'='com.mysql.cj.jdbc.Driver','username'='root','password'='123456','table-name'='realtime_stu_apply');

– INSERT INTO 插入

INSERTINTO  realtime_stu_apply_mysql SELECT day_value, total FROM view_tmp_stu_apply;

5.3.5 今日有效线索量

在这里插入图片描述

首先创建输入表:流式加载,Hudi表数据:

createtable edu_customer_clue_hudi(
  id string PRIMARYKEYNOT ENFORCED,
  create_date_time string,
  update_date_time string,
  deleted string,
  customer_id string,
  customer_relationship_id string,
  session_id string,
  sid string,status string,`user` string,
  create_time string,
  platform string,
  s_name string,
  seo_source string,
  seo_keywords string,
  ip string,
  referrer string,
  from_url string,
  landing_page_url string,
  url_title string,
  to_peer string,
  manual_time string,
  begin_time string,
  reply_msg_count string,
  total_msg_count string,
  msg_count string,`comment` string,
  finish_reason string,
  finish_user string,
  end_time string,
  platform_description string,
  browser_name string,
  os_info string,
  area string,
  country string,
  province string,
  city string,
  creator string,
  name string,
  idcard string,
  phone string,
  oldlu_school_id string,
  oldlu_school string,
  oldlu_subject_id string,
  oldlu_subject string,
  wechat string,
  qq string,
  email string,
  gender string,`level` string,
  origin_type string,
  information_way string,
  working_years string,
  technical_directions string,
  customer_state string,
  valid string,
  anticipat_signup_date string,
  clue_state string,
  scrm_department_id string,
  superior_url string,
  superior_source string,
  landing_url string,
  landing_source string,
  info_url string,
  info_source string,
  origin_channel string,
  course_id string,
  course_name string,
  zhuge_session_id string,
  is_repeat string,
  tenant string,
  activity_id string,
  activity_name string,
  follow_type string,
  shunt_mode_id string,
  shunt_employee_group_id string,
  part STRING
)
PARTITIONED BY(part)WITH('connector'='hudi','path'='hdfs://node1.oldlu.cn:8020/hudi-warehouse/edu_customer_clue_hudi','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='create_date_time','read.streaming.enabled'='true','read.streaming.check-interval'='5','read.tasks'='1');
统计结果,存储至视图View:
CREATEVIEWIFNOTEXISTS view_tmp_stu_clue ASSELECT day_value,COUNT(id)AS total FROM(SELECT
    FROM_UNIXTIME(CAST(create_date_time ASBIGINT)/1000,'yyyy-MM-dd')AS day_value, id
  FROM edu_customer_clue_hudi
  WHERE part = CAST(CURRENT_DATEAS STRING)AND clue_state ISNOTNULLAND deleted ='false')GROUPBY  day_value;
保存MySQL数据库:
-- SQL Connector MySQLCREATETABLE realtime_stu_clue_mysql (
  report_date STRING,
  report_total BIGINT,PRIMARYKEY(report_date)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://node1.oldlu.cn:3306/oldlu_rpt','driver'='com.mysql.cj.jdbc.Driver','username'='root','password'='123456','table-name'='realtime_stu_clue');

– INSERT INTO 插入

INSERTINTO  realtime_stu_clue_mysql SELECT day_value, total FROM view_tmp_stu_clue;

6 FineBI 报表可视化

使用FineBI,连接数据MySQL数据库,加载业务指标报表数据,以不同图表展示
在这里插入图片描述


本文转载自: https://blog.csdn.net/ZGL_cyy/article/details/130370560
版权归原作者 赵广陆 所有, 如有侵权,请联系我们删除。

“Hudi数据湖技术之数据中心案例实战”的评论:

还没有评论