文章目录
Day01_随堂笔记
一、经典数仓架构
范式建模
优点:避免数据冗余
缺点:取完整数据的时候需要关联,所以查询效率偏低
维度建模
优点:关联融合到了表里面,不需要关联,所以查询效率高
缺点:数据冗余高,容易出现数据不一致
二、传统离线大数据架构
特点:
以hadoop为核心对数据进行存储和计算逐渐成为数据处理中中流砥柱
优点:可以离线处理大规模数据集,在数据量大的时候,首选
缺点:批处理模式无论如何提升性能,也无法满足实时需求
三、lambda架构
总结:
在lambda架构中,为了计算一些实时指标,就在原来的离线基础增加了计算链路、
需要对数据源进行改造:消息队列,把消息发送给下游。在大数据中kafka
完成实时指标数据计算,推送到下游,服务层。有数据服务层完成离线和实时结合
Kafka是什么(可略)
Kafka是一种分布式流处理平台,由Apache软件基金会开发。它最初由LinkedIn公司开发,并于2011年作为开源项目贡献给Apache。
核心概念
- 消息(Message):Kafka处理的基本单位,是一段字节数组,可以包含任何类型的数据。
- 主题(Topic):消息的分类,一个主题可以包含多个消息,生产者将消息发布到特定的主题,消费者从主题中消费消息。
- 分区(Partition):主题的物理分区,一个主题可以有多个分区,每个分区是一个有序的、不可变的消息序列。
- 生产者(Producer):向Kafka主题发布消息的应用程序。
- 消费者(Consumer):从Kafka主题消费消息的应用程序。
- 消费者组(Consumer Group):一组消费者,它们共同消费一个主题的所有消息,每个消息只能被消费者组中的一个消费者消费。
- 偏移量(Offset):每个消息在分区中的唯一标识符,消费者通过偏移量来跟踪已经消费的消息。
- Broker:Kafka集群中的服务器节点,负责存储和管理主题和分区。
特性
- 高吞吐量:Kafka能够处理大量的读写操作,适用于实时数据流处理。
- 持久性:消息被持久化存储在磁盘上,不会因为服务器重启或故障而丢失。
- 分布式:Kafka集群由多个Broker组成,能够提供高可用性和容错性。
- 可扩展性:Kafka集群可以动态扩展,增加或减少Broker节点不会影响系统的正常运行。
- 实时性:Kafka能够实时处理数据流,适用于需要低延迟的数据处理场景。
- 可配置性:Kafka提供了丰富的配置选项,可以根据不同的需求进行定制。
应用场景
Kafka广泛应用于以下场景:
- 日志收集:收集应用程序的日志数据,并将其存储在Kafka中,以便进行后续的分析和处理。
- 流式处理:使用Kafka作为数据源,进行实时的数据流处理,如数据过滤、转换和聚合。
- 事件驱动系统:使用Kafka作为事件总线,实现事件的发布和订阅,构建事件驱动的应用程序。
- 数据集成:使用Kafka作为数据集成平台,将不同的数据源和数据目标进行连接和同步。
- 消息队列:使用Kafka作为消息队列,实现应用程序之间的异步通信。
参考文档
更多关于Kafka的信息,可以参考以下文档:
- Kafka官方文档
- Kafka设计文档
- Kafka用户指南
四、kappa架构
思想:是通过改进流计算系统来解决数据全量处理的问题,使得实时计算和批处理过程使用同一套代码。此外Kappa架构认为只有在有必要的时候才会对历史数据进行重复计算,而如果需要重复计算时,Kappa架构下可以启动很多个实例进行重复计算,方式是通过上游重放完成(从数据源拉取数据重新计算)。
优点:使用一条链路就可以完成实时和离线计算 ,成本低
缺点:
1:kappa架构最大的问题:重新计算历史数据的时候吞吐量会低于批处理
2:数据可能延迟&丢失 ,会造成数据不一致
3:不适用于批处理和流处理代码不一致场景
4:消息中间件缓存的数据和回溯的数据性能有瓶颈
5:无法复用目前已经成熟的离线数据的数据血缘数据质量管理体系
五、混合架构
总结:没有那种架构是最好的,适合业务需求才是最好的
六、传统数据入仓特点
1.0和2.0区别
2.0链路上加入实时链路,解决一些实时计算的问题,最终产出结果表(延迟问题)
七、实时数仓现状
总结:
1:lambda架构 两条链路,开发维护成本高
2:kappa架构:吞吐能力差
八、湖仓一体
1:数据湖:存储更加海量的数据
2:湖仓一体
将离线数仓和实时数仓合并存储数据湖中
即基于kappa架构分层,将kafka存储替换数据湖存储,做到了湖仓一体
3:总结
最大特点:将kappa消息队列替换数据湖组件
优点:中间结果层存储海量的数据
缺点:数据延迟比较高(秒级)、框架比较重
九、统一数仓
总结:
优点:架构极简、功能强大、使用方便(支持mysql协议,主要sql就可以)
缺点:更新不能过于频繁,否则会过度消耗集群计算资源
适合规模适中的数据
十、CDC(Change Data Capture: 变更_数据_捕获)简介
1:什么是CDC
监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
十一、CDC实现机制
1:基于主动查询CDC
用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。
2:基于事件接收CDC
当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。
十二、FlinkCDC原理和特性
1:Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog(更新的日志) 转换为 Flink SQL 认识的 RowData(行数据) 数据。(以下右侧是 Debezium 的数据格式,左侧是 Flink 的 RowData 数据格式)
2:FlinkCDC特性
- 支持数据库级别的快照,读取全量数据,2.0版本可以支持不加锁的方式读取
- 支持 binlog,捕获增量数据
- 支持Exactly-Once
- 支持 Flink DataStream API,不需要额外部署 Debezium 和 Kafka即可在一个 Flink 作业中完成变更数据的捕获和计算
- 支持 Flink Table/SQL API,可使用 SQL DDL 来创建 CDC Source 表,并对表中的数据进行查询。
十三、FlinkCDC案例
需求:使用FlinkCDC获取Mysql数据,并且捕获Mysql数据的变更
第一步:创建数据库源-在mysql中建库建表
Dropdatabaseifexists test;CREATEDATABASE test DEFAULTCHARACTERSET= utf8 COLLATE= utf8_general_ci;Use test;-- 建表语句:-- 建表-- 学生表CREATETABLE`Student`(`s_id`VARCHAR(20),`s_name`VARCHAR(20)NOTNULLDEFAULT'',`s_birth`VARCHAR(20)NOTNULLDEFAULT'',`s_sex`VARCHAR(10)NOTNULLDEFAULT'',PRIMARYKEY(`s_id`));-- 成绩表CREATETABLE`Score`(`s_id`VARCHAR(20),`c_id`VARCHAR(20),`s_score`INT(3),PRIMARYKEY(`s_id`,`c_id`));-- 插入学生表测试数据insertinto Student values('01','赵雷','1990-01-01','男');insertinto Student values('02','钱电','1990-12-21','男');insertinto Student values('03','孙风','1990-05-20','男');insertinto Student values('04','李云','1990-08-06','男');insertinto Student values('05','周梅','1991-12-01','女');insertinto Student values('06','吴兰','1992-03-01','女');insertinto Student values('07','郑竹','1989-07-01','女');insertinto Student values('08','王菊','1990-01-20','女');-- 成绩表测试数据insertinto Score values('01','01',80);insertinto Score values('01','02',90);insertinto Score values('01','03',99);insertinto Score values('02','01',70);insertinto Score values('02','02',60);insertinto Score values('02','03',80);insertinto Score values('03','01',80);insertinto Score values('03','02',80);insertinto Score values('03','03',80);insertinto Score values('04','01',50);insertinto Score values('04','02',30);insertinto Score values('04','03',20);insertinto Score values('05','01',76);insertinto Score values('05','02',87);insertinto Score values('06','01',31);insertinto Score values('06','03',34);insertinto Score values('07','02',89);insertinto Score values('07','03',98);
验证
SELECTcount(*)from Score;--18 SELECTcount(*)from Student;-- 8
第二步:开启MysqlBinlog日志 -在Mysql中执行
验证当前数据是否开启binglog
show variables like'%log_bin%';on 开启 off 未开启
1:如果未开启在linux服务器修改mysql配置
vi /etc/my.cnf
在[mysqld]下面加入如下代码:
server_id=1
log_bin = mysql-bin
binlog_format =ROW
expire_logs_days =302:重启mysql服务
systemctl restart mysqld
第三步:Flink环境准备
1:添加jar 包
添加jar包:flink-sql-connector-mysql-cdc-3.1.1.jar
添加到:cd /export/server/flink/lib
上传jar包
2:启动Flink,进入FlinkSQL客户端
启动Flink
2-1:cd /export/server/flink/bin/2-2: ./start-cluster.sh
验证:jps
38078 StandaloneSessionClusterEntrypoint
38399 TaskManagerRunner
进入FlinkSQL客户端
cd /export/server/flink/bin
sudo ./sql-client.sh
修改flink客户端参数(临时参数)
修改查询显示风格 SETsql-client.execution.result-mode= tableau;
修改检查间隔 set execution.checkpointing.interval=10sec;
第四步:构建FlinkSQL映射表
构建:Student
详解:
createtable mysql_cdc_to_test_student(
s_id string,
s_name string,
s_birth string,
s_sex string
PRIMARYKEY(s_id)NOT ENFORCED
)WITH('connector'='mysql-cdc',-- 指定连接器'hostname'='localhost',-- node1 (192.168.88.161) 你要连接哪台集群mysql'port'='3306',-- 3306'username'='root',-- mysql 用户名'password'='123456',-- mysql 密码'database-name'='mydb',-- 要连接mysql下的哪个数据库实例 mydb'table-name'='orders');-- 要导入哪张表
代码 -FlinkSQL客户端执行
CREATETABLEifnotexists mysql_cdc_to_test_Student(
s_id STRING,
s_name STRING,
s_birth STRING,
s_sex STRING,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Student');
FlinkSQL-客户端
select*from mysql_cdc_to_test_Student;
第五步:FlinkCDC捕获功能验证
在Mysql数据库中对源数据进行修改 ,观察FlinkCDC查询窗口
-- 需求 :插入数据 student表里面insertinto Student values('09','王小明','1995-01-01','男');-- 需求 :修改student表中数据update Student set s_name ='王小红'where s_id ='09';-- 需求 :删除student表中的数据deletefrom Student where s_id ='09';
拓展
如果想退出当前查询
crtl+c
需求: 查询学习01课程学生的情况已经考试情况
思路:
1:同步数据 student socre表
1:在Flink-client 构建mysql中 - Score 的映射表 mysql_cdc_to_test_Score
createtable mysql_cdc_to_test_Score(
s_id string,
c_id string,
s_score int,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Score');
2:在flinksql里面写业务逻辑
查询学习01课程学生的情况已经考试情况
select s.*,t.s_name,t.s_sex from mysql_cdc_to_test_Student as t
join mysql_cdc_to_test_Score as s
on t.s_id =s.s_id
where s.c_id='01';
十四、目标(重点!!!)
一、理解统一数仓架构
二、实现FlinkCDC案例
一、如何理解统一数仓架构
统一数仓架构是一种将离线数仓和实时数仓进行统一处理的架构。它利用了OLAP(在线分析处理)引擎的发展和成熟,如Doris、StarRocks、TiDB等,这些引擎既支持离线更新,又支持实时更新,并且在查询时支持PB级数据秒级响应。
特点:
- 架构极简:统一数仓架构通过使用一个OLAP引擎来处理离线和实时数据,避免了复杂的多系统集成和维护。
- 功能强大:OLAP引擎具备强大的数据查询能力,支持单表查询、多表关联、联邦查询、交互式查询等,能够满足各种数据分析和应用需求。
- 使用方便:OLAP引擎通常兼容MySQL协议,因此可以使用标准的SQL语言进行数据操作,降低了学习和使用的门槛。
- 降低成本:由于统一数仓架构避免了使用笨重的Hadoop、Spark、Flink等系统,因此降低了系统使用和维护的成本。
适用场景:
统一数仓架构特别适合中小型公司,这些公司的数据量不是特别大,但又有实时和离线的统计需求。它能够提供强大的功能和灵活性,同时保持架构的简洁性和易维护性。
二、如何实现FlinkCDC案例
下面是一个使用FlinkCDC从MySQL数据库中捕获数据变更并进行实时同步的案例。
第一步:准备MySQL数据库
在MySQL中创建一个测试数据库和两张表:学生表(Student)和成绩表(Score)。
Dropdatabaseifexists test;CREATEDATABASE test DEFAULTCHARACTERSET= utf8 COLLATE= utf8_general_ci;Use test;-- 建表语句:-- 学生表CREATETABLE`Student`(`s_id`VARCHAR(20),`s_name`VARCHAR(20)NOTNULLDEFAULT'',`s_birth`VARCHAR(20)NOTNULLDEFAULT'',`s_sex`VARCHAR(10)NOTNULLDEFAULT'',PRIMARYKEY(`s_id`));...................................
第二步:开启MySQL Binlog日志
在MySQL中开启Binlog日志,以便FlinkCDC能够捕获数据变更。
-- 验证当前数据是否开启binglogshow variables like'%log_bin%';-- 如果未开启,在Linux服务器修改mysql配置
vi /etc/my.cnf
在[mysqld]下面加入如下代码:
server_id=1
log_bin = mysql-bin
binlog_format =ROW
expire_logs_days =30-- 重启mysql服务
systemctl restart mysqld
第三步:准备Flink环境
将FlinkCDC的JAR包(如flink-sql-connector-mysql-cdc-3.1.1.jar)添加到Flink的lib目录下,并启动Flink集群和Flink SQL客户端。
# 启动Flink集群cd /export/server/flink/bin/
./start-cluster.sh
# 启动Flink SQL客户端cd /export/server/flink/bin
sudo ./sql-client.sh
第四步:构建Flink SQL映射表
在Flink SQL客户端中创建Student表和Score表的映射,以便FlinkCDC能够读取和同步数据。
-- 创建Student表的映射CREATETABLEifnotexists mysql_cdc_to_test_Student(
s_id STRING,
s_name STRING,
s_birth STRING,
s_sex STRING,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Student');-- 创建Score表的映射CREATETABLEifnotexists mysql_cdc_to_test_Score(`s_id` STRING,`c_id` STRING,`s_score`INT,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Score');
第五步:查询和验证数据
在Flink SQL客户端中查询Student表和Score表的数据,并验证FlinkCDC是否能够实时捕获数据变更。
-- 查询Student表的数据select*from mysql_cdc_to_test_Student;-- 查询Score表的数据select*from mysql_cdc_to_test_Score;-- 查询学习01课程的学生及考试情况select t.*, s.s_name, s.s_sex from mysql_cdc_to_test_Score AS t
INNERJOIN mysql_cdc_to_test_Student AS s ON t.s_id = s.s_id
WHERE t.c_id ='01';
在MySQL中对Student表和Score表进行增删改操作,观察Flink SQL客户端中的数据是否实时更新,以验证FlinkCDC的功能。
【拓展】
尝试了解一下FlinkCDC几种消费模式
版权归原作者 十六IT 所有, 如有侵权,请联系我们删除。