0


Day01_统一数仓介绍_FlinkCDC

文章目录

Day01_随堂笔记

一、经典数仓架构

​ 范式建模

​ 优点:避免数据冗余

​ 缺点:取完整数据的时候需要关联,所以查询效率偏低

​ 维度建模

​ 优点:关联融合到了表里面,不需要关联,所以查询效率高

​ 缺点:数据冗余高,容易出现数据不一致

二、传统离线大数据架构

在这里插入图片描述

特点:

​ 以hadoop为核心对数据进行存储和计算逐渐成为数据处理中中流砥柱

​ 优点:可以离线处理大规模数据集,在数据量大的时候,首选

​ 缺点:批处理模式无论如何提升性能,也无法满足实时需求

三、lambda架构

在这里插入图片描述

总结:

​ 在lambda架构中,为了计算一些实时指标,就在原来的离线基础增加了计算链路、

​ 需要对数据源进行改造:消息队列,把消息发送给下游。在大数据中kafka

​ 完成实时指标数据计算,推送到下游,服务层。有数据服务层完成离线和实时结合

Kafka是什么(可略)

Kafka是一种分布式流处理平台,由Apache软件基金会开发。它最初由LinkedIn公司开发,并于2011年作为开源项目贡献给Apache。

核心概念
  1. 消息(Message):Kafka处理的基本单位,是一段字节数组,可以包含任何类型的数据。
  2. 主题(Topic):消息的分类,一个主题可以包含多个消息,生产者将消息发布到特定的主题,消费者从主题中消费消息。
  3. 分区(Partition):主题的物理分区,一个主题可以有多个分区,每个分区是一个有序的、不可变的消息序列。
  4. 生产者(Producer):向Kafka主题发布消息的应用程序。
  5. 消费者(Consumer):从Kafka主题消费消息的应用程序。
  6. 消费者组(Consumer Group):一组消费者,它们共同消费一个主题的所有消息,每个消息只能被消费者组中的一个消费者消费。
  7. 偏移量(Offset):每个消息在分区中的唯一标识符,消费者通过偏移量来跟踪已经消费的消息。
  8. Broker:Kafka集群中的服务器节点,负责存储和管理主题和分区。
特性
  1. 高吞吐量:Kafka能够处理大量的读写操作,适用于实时数据流处理。
  2. 持久性:消息被持久化存储在磁盘上,不会因为服务器重启或故障而丢失。
  3. 分布式:Kafka集群由多个Broker组成,能够提供高可用性和容错性。
  4. 可扩展性:Kafka集群可以动态扩展,增加或减少Broker节点不会影响系统的正常运行。
  5. 实时性:Kafka能够实时处理数据流,适用于需要低延迟的数据处理场景。
  6. 可配置性:Kafka提供了丰富的配置选项,可以根据不同的需求进行定制。
应用场景

Kafka广泛应用于以下场景:

  1. 日志收集:收集应用程序的日志数据,并将其存储在Kafka中,以便进行后续的分析和处理。
  2. 流式处理:使用Kafka作为数据源,进行实时的数据流处理,如数据过滤、转换和聚合。
  3. 事件驱动系统:使用Kafka作为事件总线,实现事件的发布和订阅,构建事件驱动的应用程序。
  4. 数据集成:使用Kafka作为数据集成平台,将不同的数据源和数据目标进行连接和同步。
  5. 消息队列:使用Kafka作为消息队列,实现应用程序之间的异步通信。
参考文档

更多关于Kafka的信息,可以参考以下文档:

  • Kafka官方文档
  • Kafka设计文档
  • Kafka用户指南

四、kappa架构

在这里插入图片描述

思想:是通过改进流计算系统来解决数据全量处理的问题,使得实时计算和批处理过程使用同一套代码。此外Kappa架构认为只有在有必要的时候才会对历史数据进行重复计算,而如果需要重复计算时,Kappa架构下可以启动很多个实例进行重复计算,方式是通过上游重放完成(从数据源拉取数据重新计算)。

优点:使用一条链路就可以完成实时和离线计算 ,成本低

缺点:

​ 1:kappa架构最大的问题:重新计算历史数据的时候吞吐量会低于批处理

​ 2:数据可能延迟&丢失 ,会造成数据不一致

​ 3:不适用于批处理和流处理代码不一致场景

​ 4:消息中间件缓存的数据和回溯的数据性能有瓶颈

​ 5:无法复用目前已经成熟的离线数据的数据血缘数据质量管理体系

五、混合架构

在这里插入图片描述

总结:没有那种架构是最好的,适合业务需求才是最好的

在这里插入图片描述

六、传统数据入仓特点

在这里插入图片描述

​ 1.0和2.0区别

​ 2.0链路上加入实时链路,解决一些实时计算的问题,最终产出结果表(延迟问题)

七、实时数仓现状

总结:

​ 1:lambda架构 两条链路,开发维护成本高

​ 2:kappa架构:吞吐能力差

八、湖仓一体

在这里插入图片描述

​ 1:数据湖:存储更加海量的数据

​ 2:湖仓一体

​ 将离线数仓和实时数仓合并存储数据湖中

​ 即基于kappa架构分层,将kafka存储替换数据湖存储,做到了湖仓一体

​ 3:总结

​ 最大特点:将kappa消息队列替换数据湖组件

​ 优点:中间结果层存储海量的数据

​ 缺点:数据延迟比较高(秒级)、框架比较重

九、统一数仓

在这里插入图片描述

总结:

​ 优点:架构极简、功能强大、使用方便(支持mysql协议,主要sql就可以)

​ 缺点:更新不能过于频繁,否则会过度消耗集群计算资源

​ 适合规模适中的数据

十、CDC(Change Data Capture: 变更_数据_捕获)简介

1:什么是CDC

监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费

在这里插入图片描述

十一、CDC实现机制

​ 1:基于主动查询CDC

​ 用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。

​ 2:基于事件接收CDC

​ 当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。

十二、FlinkCDC原理和特性

1:Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将 changelog(更新的日志) 转换为 Flink SQL 认识的 RowData(行数据) 数据。(以下右侧是 Debezium 的数据格式,左侧是 Flink 的 RowData 数据格式)

2:FlinkCDC特性

  1. 支持数据库级别的快照,读取全量数据,2.0版本可以支持不加锁的方式读取
  2. 支持 binlog,捕获增量数据
  3. 支持Exactly-Once
  4. 支持 Flink DataStream API,不需要额外部署 Debezium 和 Kafka即可在一个 Flink 作业中完成变更数据的捕获和计算
  5. 支持 Flink Table/SQL API,可使用 SQL DDL 来创建 CDC Source 表,并对表中的数据进行查询。

十三、FlinkCDC案例

需求:使用FlinkCDC获取Mysql数据,并且捕获Mysql数据的变更

第一步:创建数据库源-在mysql中建库建表
Dropdatabaseifexists test;CREATEDATABASE test DEFAULTCHARACTERSET= utf8 COLLATE= utf8_general_ci;Use test;-- 建表语句:-- 建表-- 学生表CREATETABLE`Student`(`s_id`VARCHAR(20),`s_name`VARCHAR(20)NOTNULLDEFAULT'',`s_birth`VARCHAR(20)NOTNULLDEFAULT'',`s_sex`VARCHAR(10)NOTNULLDEFAULT'',PRIMARYKEY(`s_id`));-- 成绩表CREATETABLE`Score`(`s_id`VARCHAR(20),`c_id`VARCHAR(20),`s_score`INT(3),PRIMARYKEY(`s_id`,`c_id`));-- 插入学生表测试数据insertinto Student values('01','赵雷','1990-01-01','男');insertinto Student values('02','钱电','1990-12-21','男');insertinto Student values('03','孙风','1990-05-20','男');insertinto Student values('04','李云','1990-08-06','男');insertinto Student values('05','周梅','1991-12-01','女');insertinto Student values('06','吴兰','1992-03-01','女');insertinto Student values('07','郑竹','1989-07-01','女');insertinto Student values('08','王菊','1990-01-20','女');-- 成绩表测试数据insertinto Score values('01','01',80);insertinto Score values('01','02',90);insertinto Score values('01','03',99);insertinto Score values('02','01',70);insertinto Score values('02','02',60);insertinto Score values('02','03',80);insertinto Score values('03','01',80);insertinto Score values('03','02',80);insertinto Score values('03','03',80);insertinto Score values('04','01',50);insertinto Score values('04','02',30);insertinto Score values('04','03',20);insertinto Score values('05','01',76);insertinto Score values('05','02',87);insertinto Score values('06','01',31);insertinto Score values('06','03',34);insertinto Score values('07','02',89);insertinto Score values('07','03',98);

验证

SELECTcount(*)from Score;--18  SELECTcount(*)from Student;-- 8

在这里插入图片描述

第二步:开启MysqlBinlog日志 -在Mysql中执行
验证当前数据是否开启binglog
show variables like'%log_bin%';on  开启  off 未开启

1:如果未开启在linux服务器修改mysql配置
vi /etc/my.cnf
在[mysqld]下面加入如下代码:
server_id=1
log_bin = mysql-bin
binlog_format =ROW
expire_logs_days =302:重启mysql服务
systemctl restart mysqld
第三步:Flink环境准备
1:添加jar 包 
   添加jar包:flink-sql-connector-mysql-cdc-3.1.1.jar
   添加到:cd /export/server/flink/lib
   上传jar包
       
2:启动Flink,进入FlinkSQL客户端
   
   启动Flink
    2-1:cd /export/server/flink/bin/2-2: ./start-cluster.sh    
   验证:jps 
           38078 StandaloneSessionClusterEntrypoint
        38399 TaskManagerRunner
        
    进入FlinkSQL客户端
        cd /export/server/flink/bin
        sudo ./sql-client.sh 
        
    修改flink客户端参数(临时参数)
        修改查询显示风格   SETsql-client.execution.result-mode= tableau;
        修改检查间隔      set execution.checkpointing.interval=10sec;
第四步:构建FlinkSQL映射表
构建:Student

详解:
createtable mysql_cdc_to_test_student(
   s_id string,
   s_name string,
   s_birth string,
   s_sex string
   PRIMARYKEY(s_id)NOT ENFORCED 
)WITH('connector'='mysql-cdc',-- 指定连接器'hostname'='localhost',-- node1 (192.168.88.161) 你要连接哪台集群mysql'port'='3306',-- 3306'username'='root',-- mysql 用户名'password'='123456',-- mysql 密码'database-name'='mydb',-- 要连接mysql下的哪个数据库实例 mydb'table-name'='orders');-- 要导入哪张表
     
代码 -FlinkSQL客户端执行
CREATETABLEifnotexists mysql_cdc_to_test_Student(
     s_id     STRING,
     s_name   STRING,
     s_birth  STRING,
     s_sex    STRING,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Student');

FlinkSQL-客户端
select*from mysql_cdc_to_test_Student;
第五步:FlinkCDC捕获功能验证
在Mysql数据库中对源数据进行修改 ,观察FlinkCDC查询窗口

-- 需求 :插入数据 student表里面insertinto Student values('09','王小明','1995-01-01','男');-- 需求 :修改student表中数据update Student set s_name ='王小红'where s_id ='09';-- 需求 :删除student表中的数据deletefrom Student where s_id ='09';

拓展

如果想退出当前查询

crtl+c  

需求: 查询学习01课程学生的情况已经考试情况

思路:

1:同步数据 student socre表

1:在Flink-client 构建mysql中 - Score 的映射表 mysql_cdc_to_test_Score
createtable  mysql_cdc_to_test_Score(
    s_id string,
    c_id string,
    s_score int,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Score');

2:在flinksql里面写业务逻辑

查询学习01课程学生的情况已经考试情况
select s.*,t.s_name,t.s_sex from  mysql_cdc_to_test_Student as t 
join  mysql_cdc_to_test_Score as s 
on t.s_id =s.s_id
where s.c_id='01';

十四、目标(重点!!!)

一、理解统一数仓架构

二、实现FlinkCDC案例

一、如何理解统一数仓架构

统一数仓架构是一种将离线数仓和实时数仓进行统一处理的架构。它利用了OLAP(在线分析处理)引擎的发展和成熟,如Doris、StarRocks、TiDB等,这些引擎既支持离线更新,又支持实时更新,并且在查询时支持PB级数据秒级响应。

特点:
  1. 架构极简:统一数仓架构通过使用一个OLAP引擎来处理离线和实时数据,避免了复杂的多系统集成和维护。
  2. 功能强大:OLAP引擎具备强大的数据查询能力,支持单表查询、多表关联、联邦查询、交互式查询等,能够满足各种数据分析和应用需求。
  3. 使用方便:OLAP引擎通常兼容MySQL协议,因此可以使用标准的SQL语言进行数据操作,降低了学习和使用的门槛。
  4. 降低成本:由于统一数仓架构避免了使用笨重的Hadoop、Spark、Flink等系统,因此降低了系统使用和维护的成本。
适用场景:

统一数仓架构特别适合中小型公司,这些公司的数据量不是特别大,但又有实时和离线的统计需求。它能够提供强大的功能和灵活性,同时保持架构的简洁性和易维护性。

二、如何实现FlinkCDC案例

下面是一个使用FlinkCDC从MySQL数据库中捕获数据变更并进行实时同步的案例。

第一步:准备MySQL数据库

在MySQL中创建一个测试数据库和两张表:学生表(Student)和成绩表(Score)。

Dropdatabaseifexists test;CREATEDATABASE test DEFAULTCHARACTERSET= utf8 COLLATE= utf8_general_ci;Use test;-- 建表语句:-- 学生表CREATETABLE`Student`(`s_id`VARCHAR(20),`s_name`VARCHAR(20)NOTNULLDEFAULT'',`s_birth`VARCHAR(20)NOTNULLDEFAULT'',`s_sex`VARCHAR(10)NOTNULLDEFAULT'',PRIMARYKEY(`s_id`));...................................
第二步:开启MySQL Binlog日志

在MySQL中开启Binlog日志,以便FlinkCDC能够捕获数据变更。

-- 验证当前数据是否开启binglogshow variables like'%log_bin%';-- 如果未开启,在Linux服务器修改mysql配置
vi /etc/my.cnf
在[mysqld]下面加入如下代码:
server_id=1
log_bin = mysql-bin
binlog_format =ROW
expire_logs_days =30-- 重启mysql服务
systemctl restart mysqld
第三步:准备Flink环境

将FlinkCDC的JAR包(如flink-sql-connector-mysql-cdc-3.1.1.jar)添加到Flink的lib目录下,并启动Flink集群和Flink SQL客户端。

# 启动Flink集群cd /export/server/flink/bin/
./start-cluster.sh

# 启动Flink SQL客户端cd /export/server/flink/bin
sudo ./sql-client.sh
第四步:构建Flink SQL映射表

在Flink SQL客户端中创建Student表和Score表的映射,以便FlinkCDC能够读取和同步数据。

-- 创建Student表的映射CREATETABLEifnotexists mysql_cdc_to_test_Student(
     s_id     STRING,
     s_name   STRING,
     s_birth  STRING,
     s_sex    STRING,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Student');-- 创建Score表的映射CREATETABLEifnotexists mysql_cdc_to_test_Score(`s_id`   STRING,`c_id`   STRING,`s_score`INT,PRIMARYKEY(`s_id`)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='node1','port'='3306','username'='root','password'='123456','server-time-zone'='Asia/Shanghai','scan.startup.mode'='initial','database-name'='test','table-name'='Score');
第五步:查询和验证数据

在Flink SQL客户端中查询Student表和Score表的数据,并验证FlinkCDC是否能够实时捕获数据变更。

-- 查询Student表的数据select*from mysql_cdc_to_test_Student;-- 查询Score表的数据select*from mysql_cdc_to_test_Score;-- 查询学习01课程的学生及考试情况select t.*, s.s_name, s.s_sex from mysql_cdc_to_test_Score AS t
INNERJOIN mysql_cdc_to_test_Student AS s ON t.s_id = s.s_id
WHERE t.c_id ='01';

在MySQL中对Student表和Score表进行增删改操作,观察Flink SQL客户端中的数据是否实时更新,以验证FlinkCDC的功能。

【拓展】

https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/zh/docs/connectors/flink-sources/mysql-cdc/

尝试了解一下FlinkCDC几种消费模式

在这里插入图片描述

在这里插入图片描述


本文转载自: https://blog.csdn.net/weixin_74002941/article/details/144119121
版权归原作者 十六IT 所有, 如有侵权,请联系我们删除。

“Day01_统一数仓介绍_FlinkCDC”的评论:

还没有评论