0


Flink学习之Flink SQL(补)

Flink SQL

1、SQL客户端

1.1 基本使用
  • 启动yarn-sessionyarn-session.sh -d
  • 启动Flink SQL客户端sql-client.sh--退出客户端exit;
  • 测试> 重启SQL客户端之后,需要重新建表-- 构建Kafka Source-- 无界流droptableifexists students_kafka_source;CREATETABLEifnotexists students_kafka_source (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING)WITH('connector'='kafka','topic'='students1000','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='csv',-- 以 ,分隔的数据-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 执行查询,基于KafkaSource是无界流,所以查询时连续变化的select*from students_kafka_source;select clazz,count(*)as cnt from students_kafka_source groupby clazz;-- 向Kafka生产数据kafka-console-producer.sh --broker-list master:9092 --topic students1000
1.2 三种显示模式
  • 表格模式> SQL客户端默认的结果显示模式> > 在内存中实体化结果,并将结果用规则的分页表格可视化展示出来SET'sql-client.execution.result-mode'='table';
  • 变更日志模式> 不会实体化和可视化结果,而是由插入(> > +> > )和撤销(> > -> > )组成的持续查询产生结果流SET'sql-client.execution.result-mode'='changelog';
  • Tableau模式> 更接近传统的数据库,会将执行的结果(类似变更日志模式,由插入(> > +> > )和撤销(> > -> > )组成的持续查询产生结果流)以制表的形式直接打在屏幕之上SET'sql-client.execution.result-mode'='tableau';
1.3 不同的执行模式
  • 批处理> 只能处理有界流> > 结果是固定的> > 底层是基于MR模型> > 不会出现由插入(> > +> > )和撤销(> > -> > )组成的持续查询产生结果流这种结果,只会出现最终结果SET'execution.runtime-mode'='batch';
  • 流处理> 默认的方式> > 既可以处理无界流,也可以处理有界流> > 结果是连续变化的> > 底层是基于持续流模型SET'execution.runtime-mode'='streaming';

2、常用的connector

2.1 Kafka
  • 准备工作# 下载依赖https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar# 长传至${FLINK_HOME}/lib# 重启yarn-session.sh# 先找到yarn-session的application idyarn application -list# kill掉yarn-session在Yarn上的进程yarn application -kill application_1722331927004_0007# 再启动yarn-sessionyarn-session.sh -d# 再启动sql-clientsql-client.sh
  • Source-- 构建Kafka Source-- 无界流droptableifexists students_kafka_source;CREATETABLEifnotexists students_kafka_source (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,-- Kafka Source提供的数据之外的数据(元数据)`event_time`TIMESTAMP(3) METADATA FROM'timestamp',`pt`BIGINT METADATA FROM'partition',`offset`BIGINT METADATA FROM'offset',`topic` STRING METADATA FROM'topic')WITH('connector'='kafka','topic'='students1000','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 执行查询select id,name,event_time,pt,`offset`,`topic`from students_kafka_source limit10;
  • Sink- 结果不带更新的Sink- csv:只能添加新数据,不能修改或删除现有数据droptableifexists students_lksb_sink;CREATETABLEifnotexists students_lksb_sink (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING)WITH('connector'='kafka','topic'='students_lksb_sink01','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 执行不带更新的查询insertinto students_lksb_sinkselect id,name,age,gender,clazz from students_kafka_source where clazz='理科四班';select*from students_lksb_sink;-- 若执行下述代码则会出错,org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.clazz_cnt_sink' doesn't support consuming update changes 指出你正在尝试将包含更新操作的流数据写入到一个不支持更新操作的表接收器(Table Sink)中。droptableifexists clazz_cnt_sink;CREATETABLEifnotexists clazz_cnt_sink (`clazz` String,`cnt`BIGINT)WITH('connector'='kafka','topic'='clazz_cnt_sink_02','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='csv');-- 执行查询并且将查询结果插入到Sink表中insertinto clazz_cnt_sinkselect clazz,count(*)as cnt from students_kafka_source groupby clazz;select*from clazz_cnt_sink;> 结果:数据以追加的形式进行更新在这里插入图片描述- 结果带更新的Sink> Kafka只支持追加的写入,不支持更新数据> > 故有更新的查询结果无法直接编码,写入Kafka> > 虽然Kafka支支持append,但是可以将更新流编码成“ +、-”不断追加到Kafka中> > 如果有更新,那么往Kafka写两条记录即可表示更新,即:先“-”再“+”> > 但是csv这种格式无法表达“-”或“+”操作,故无法在有更新的结果写Kafka时使用> > 需要使用:canal-json或者是debezium-json> > canal-json:{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}> > debezium-json:{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}-- 基于Kafka Source 统计班级人数 最终结果写入Kafkadroptableifexists clazz_cnt_sink;CREATETABLEifnotexists clazz_cnt_sink (`clazz` String,`cnt`BIGINT)WITH('connector'='kafka','topic'='clazz_cnt_sink_02','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='canal-json'-- 或者是指定为debezium-json);-- 执行查询并且将查询结果插入到Sink表中insertinto clazz_cnt_sinkselect clazz,count(*)as cnt from students_kafka_source groupby clazz;select*from clazz_cnt_sink;

结果数据会持续的更新:

在这里插入图片描述

2.2 JDBC

用于连接数据库,例如:MySQL、Oracle、PG、Derby

  • 准备工作# 下载依赖https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar# 上传依赖至FLINK的lib目录下,还需要将Linu中MySQL的驱动拷贝一份到lib目录下,可以从Hadoop中进行拷贝# 重启yarn-session以及sql客户端
  • Source> 有界流,只会查询一次,查询完后直接结束(从jdbc中读取数据是有界流droptableifexists students_mysql_source;CREATETABLEifnotexists students_mysql_source (`id`INT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,PRIMARYKEY(id)NOT ENFORCED)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/bigdata_30?useSSL=false','table-name'='students2','username'='root','password'='123456');-- 执行查询select*from students_mysql_source;-- 将模式换成tableau 看结果变化的全过程SET'sql-client.execution.result-mode'='tableau';-- 默认会以 流处理的方式 执行,所以可以看到结果连续变化的过程select gender,count(*)as cnt from students_mysql_source groupby gender;-- 将运行模式切换成批处理SET'execution.runtime-mode'='batch';-- 再试执行,只会看到最终的一个结果,没有变化的过程(这是与流处理的区别之处)select gender,count(*)as cnt from students_mysql_source groupby gender;
  • Sink> 从Kafka接收无界流的学生数据,统计班级人数,将最终的结果写入MySQL-- 创建MySQL的结果表-- 查询库中已有表的建表语句showcreatetable xxx;-- 无主键的MySQL建表语句-- 最终发现写入的结果是有连续变换的过程,并不是直接写入最终的结果droptableifexists`clazz_cnt`;CREATETABLEifnotexists`clazz_cnt`(`clazz`varchar(255)DEFAULTNULL,`cnt`bigintDEFAULTNULL)ENGINE=InnoDBDEFAULTCHARSET=utf8;-- 将班级设置为主键-- 最终写入的结果是可以通过主键进行更新,所以可以展示最终的结果,并且可以实时更新droptableifexists`clazz_cnt`;CREATETABLEifnotexists`clazz_cnt`(`clazz`varchar(255)NOTNULL,`cnt`bigint(20)DEFAULTNULL,PRIMARYKEY(`clazz`))ENGINE=InnoDBDEFAULTCHARSET=utf8;-- 创建MySQL的Sink表droptableifexists clazz_cnt_mysql_sink;CREATETABLEifnotexists clazz_cnt_mysql_sink (`clazz` STRING,`cnt`BIGINT,-- 如果查询的结果有更新,则需要设置主键PRIMARYKEY(clazz)NOT ENFORCED)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/bigdata30?useSSL=false','table-name'='clazz_cnt','username'='root','password'='123456');-- 记得将执行模式切换成流处理,因为Kafka是无界流SET'execution.runtime-mode'='streaming';-- 执行查询:实时统计班级人数,将结果写入MySQLinsertinto clazz_cnt_mysql_sinkselect clazz,count(*)as cnt from students_kafka_source where clazz isnotnullgroupby clazz;
2.3 HDFS
  • Source- 有界流> 默认的方式droptableifexists students_hdfs_source;CREATETABLEifnotexists students_hdfs_source (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,`file.path` STRING NOTNULL METADATA)WITH('connector'='filesystem','path'='hdfs://master:9000/bigdata30/students.txt','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 查询表中是否有数据select*from students_hdfs_source limit100;- 无界流> 同DataStream的FileSource一致> > 可以通过设置source.monitor-interval参数,来指定一个监控的间隔时间,例如:5s> > FLink就会定时监控目录的一个变换,有新的文件就可以实时进行读取> > 最终得到一个无界流-- 创建HDFS目录hdfs dfs -mkdir /bigdata30/flink-- 创建Source表droptableifexists students_hdfs_unbounded_source;CREATETABLEifnotexists students_hdfs_unbounded_source (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,`file.path` STRING NOTNULL METADATA)WITH('connector'='filesystem','path'='hdfs://master:9000/bigdata30/flink','source.monitor-interval'='5s','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 执行查询select*from students_hdfs_unbounded_source;-- 向目录上传文件hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt1hdfs dfs -cp /bigdata30/students.txt /bigdata30/flink/students.txt2
  • Sink- 查询结果没有更新,写入数据droptableifexists students_hdfs_sink;CREATETABLEifnotexists students_hdfs_sink (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,`file_path` STRING)WITH('connector'='filesystem','path'='hdfs://master:9000/bigdata30/sink/','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');insertinto students_hdfs_sinkselect*from students_hdfs_source;- 查询结果有更新,写入数据> 同Kafka类似,HDFS不支持更新数据,故需要将变换的结果编码成canal-json或者是debezium-json的格式才能进行insertdroptableifexists clazz_cnt_hdfs_sink;CREATETABLEifnotexists clazz_cnt_hdfs_sink (`clazz` STRING,`cnt`BIGINT)WITH('connector'='filesystem','path'='hdfs://master:9000/bigdata30/clazz_cnt/','format'='canal-json');-- 使用有界的数据源来写入待更新的计算结果insertinto clazz_cnt_hdfs_sinkselect clazz,count(*)as cnt from students_hdfs_source groupby clazz;
2.4 HBase
hbase启动顺序:
zk(三台虚拟机都启动)-->hadoop(主从复制:在master端启动即可)-->hbase(在master端启动即可)

hbase关闭顺序:
hbase-->hadoop-->zk

# 启动
start-hbase.sh

#关闭
stop-hbase.sh

# 进入HBase的客户端
hbase shell
  • 准备工作# 下载依赖https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar# 上传依赖并重启yarn-session及sql客户端
  • Source> 同MySQL类似,得到是一个有界流droptableifexists students_hbase_source;CREATETABLEifnotexists students_hbase_source ( rowkey STRING, info ROW<name STRING, age STRING,gender STRING,clazz STRING>,PRIMARYKEY(rowkey)NOT ENFORCED)WITH('connector'='hbase-2.2','table-name'='students','zookeeper.quorum'='master:2181');select rowkey,info.name,info.age,info.gender,info.clazz from students_hbase_source;
  • Sink> 同MySQL类似-- 在HBase中建表create'stu01','info'-- 构建HBase Sink表droptableifexists stu_hbase_sink;CREATETABLEifnotexists stu_hbase_sink ( id STRING, info ROW<name STRING,clazz STRING>,PRIMARYKEY(id)NOT ENFORCED)WITH('connector'='hbase-2.2','table-name'='stu01','zookeeper.quorum'='master:2181');-- 丢弃null的数据set'table.exec.sink.not-null-enforcer'='DROP';-- 仅追加的结果写入,由于HBase有rk存在,相同的RK会进行覆盖insertinto stu_hbase_sinkselect cast(id as STRING)as id,ROW(name,clazz)as info from students_kafka_source;-- hbase中遍历数据scan "stu01",LIMIT=>50-- 在HBase中建表create'clazz_cnt_01','info'-- 构建HBase Sink表droptableifexists clazz_cnt_hbase_sink;CREATETABLEifnotexists clazz_cnt_hbase_sink ( clazz STRING, info ROW<cnt BIGINT>,PRIMARYKEY(clazz)NOT ENFORCED)WITH('connector'='hbase-2.2','table-name'='clazz_cnt_01','zookeeper.quorum'='master:2181');-- 带更新的查询结果可以实时在HBase中通过RK进行更新insertinto clazz_cnt_hbase_sinkselect clazz,ROW(count(*))as infofrom students_kafka_sourcegroupby clazz;-- hbase中遍历数据scan "clazz_cnt_01",LIMIT=>50
2.5 DataGen

用于按照指定的规则生成数据,一般用于性能测试

droptableifexists datagen;CREATETABLEifnotexists datagen (
    id BIGINT,random_id BIGINT,name STRING
)WITH('connector'='datagen',-- optional options --'rows-per-second'='20',-- 设置每秒钟生成的数据量'fields.id.kind'='random','fields.id.min'='10000000','fields.id.max'='99999999','fields.random_id.kind'='random','fields.random_id.min'='10000000','fields.random_id.max'='99999999','fields.name.length'='5');
2.6 Blackhole

用于性能测试,可以作为Sink端

droptableifexists blackhole_table;CREATETABLEifnotexists  blackhole_table
WITH('connector'='blackhole')LIKE datagen (EXCLUDING ALL);insertinto blackhole_table
select*from datagen groupby name;droptableifexists blackhole_table;CREATETABLEifnotexists  blackhole_table(
    name String,
    cnt BIGINT)WITH('connector'='blackhole');insertinto blackhole_table
select name,count(*)as cnt from datagen groupby name;
2.7 Print

将结果数据在TaskManager中输出

droptableifexists print_table;CREATETABLEifnotexists print_table (
 name STRING,
 cnt BIGINT)WITH('connector'='print');insertinto print_table
select name,count(*)as cnt from datagen groupby name;

3、常用的格式

3.1 CSV

逗号分隔符文件,并非一定是.csv文件

在作为Sink时的format,仅支持写入不带更新的结果

解析每条数据是通过顺序匹配

常用参数:

csv.ignore-parse-errors 默认false,忽略解析错误,不会导致程序直接停止

csv.field-delimiter 默认 逗号,指定数据的列分隔符

3.2 JSON
3.2.1 json

普通的json格式,解析数据是通过列名进行匹配

同csv类似,只支持写入不带更新的结果

droptableifexists cars_json_source;CREATETABLEifnotexists cars_json_source (
    car String
    ,county_code INT,city_code INT,card BIGINT,camera_id String
    ,orientation String
    ,road_id BIGINT,`time`BIGINT,speed Double)WITH('connector'='kafka','topic'='cars_json','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='json');
3.2.2 canal-json

一种特殊的JSON格式

支持写入更新的结果

{“data”:[{“clazz”:“文科六班”,“cnt”:104}],“type”:“DELETE”}

3.2.3 debezium-json

同canal-json,只是数据格式有些许差异

{“before”:null,“after”:{“clazz”:“理科四班”,“cnt”:94},“op”:“c”}

3.3 ORC

一般不用

3.4 PARQUET

一般不用

4、时间属性

4.1 处理时间

基于系统的时间

droptableifexists students_kafka_source;CREATETABLEifnotexists students_kafka_source (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,-- 通过系统时间给表增加一列,即:处理时间
   proc_time as PROCTIME())WITH('connector'='kafka','topic'='students1000','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 查询的结果每个五秒统计一次select  clazz
        ,count(*)as cnt
        ,tumble_start(proc_time,INTERVAL'5' SECONDS)as window_start
        ,tumble_end(proc_time,INTERVAL'5' SECONDS)as window_end
from students_kafka_source 
groupby clazz,tumble(proc_time,INTERVAL'5' SECONDS);-- 向Topic中生产数据
kafka-console-producer.sh --broker-list master:9092 --topic students1000

由于是csv格式,只支持写入不带更新的结果
在这里插入图片描述

4.2 事件时间

基于数据自带的时间

java,2024-08-03 10:41:50

java,2024-08-03 10:41:51

java,2024-08-03 10:41:52

java,2024-08-03 10:41:55

java,2024-08-03 10:41:56

java,2024-08-03 10:42:56

java,2024-08-03 10:43:00

java,2024-08-03 10:43:10

droptableifexists words_kafka_source;CREATETABLEifnotexists words_kafka_source (`word` STRING,-- 从数据中过来的一列,作为事件时间
   event_time TIMESTAMP(3),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间;水位线前移五秒,相当于每隔10秒的事件时间,触发一次执行
   WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='words_event_times','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 创建topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic words_event_times-- 执行查询,使用滚动的事件时间窗口进行word count,每5s统计一次select  word
        ,count(*)as cnt
        ,tumble_start(event_time,INTERVAL'5' SECONDS)as window_start
        ,tumble_end(event_time,INTERVAL'5' SECONDS)as window_end
from words_kafka_source 
groupby word,tumble(event_time,INTERVAL'5' SECONDS);-- 向Topic中生产数据
kafka-console-producer.sh --broker-list master:9092 --topic words_event_times

5、SQL语法

5.1 Hints

在SQL查询时动态修改表的参数配置

-- words_kafka_source 默认从最后开始消费select*from words_kafka_source;// 只能查询到最新的数据,不会从头开始消费-- 假设现在需要从头开始消费-- 第一种方案,将words_kafka_source删除重建-- 第二种方案,通过alter table 对表进行修改-- 第三种方案,通过hints动态调整表的配置select*from words_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset') */;
5.2 With

用于将多次执行的同一查询通过with先定义,后面可以进行多次使用,避免重复的SQL

应用场景:1、多次使用的SQL查询可以缓存提高性能 2、将多级嵌套解开来,降低主SQL的复杂度

-- 创建表时,应注意表中的数据类型要与MySQL数据库中的表的数据类型相符合droptableifexists students_mysql_source;CREATETABLEifnotexists students_mysql_source (`id`INT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/bigdata_30?useSSL=false','table-name'='students2','username'='root','password'='123456');select id,name from students_mysql_source where clazz ='理科一班'unionallselect id,name from students_mysql_source where clazz ='理科一班';-- 通过with可以将多次使用的SQL进行定义with stu_lkyb as(select id,name from students_mysql_source where clazz ='理科一班')select*from stu_lkyb
unionallselect*from stu_lkyb
unionallselect*from stu_lkyb
;
5.3 Where

可以进行过滤

select id,name,clazz,age from students_mysql_source where clazz ='理科一班'and age >20;-- 找到重复数据并进行过滤,但是这样会将所有重复的数据都过滤掉,一般都是想留下一条,其余重复的数据给它过滤select    id,name,age,gender,clazz
from(select id,name,age,gender,clazz,count(*)as cnt from students_mysql_source groupby id,name,age,gender,clazz
) t1 where t1.cnt =1;-- 聚合后的过滤可以使用Havingselect id,name,age,gender,clazz,count(*)as cnt from students_mysql_source groupby id,name,age,gender,clazz
havingcount(*)=1;
5.4 Distinct

用于去重

需要对每条不同的数据维护一个状态,状态会无限制的增大,最终任务可能会失败

无界流是正常可以去重的

有界流必须在分组之后带上聚合操作才能去重,如果直接distinct或者是groupby不聚合,最终任务里不会产生shuffle,即不会分组,也就无法去重

-- 去重select id,name,age,gender,clazz from students_mysql_source groupby id,name,age,gender,clazz;-- 等价于distinctselectdistinct id,name,age,gender,clazz from students_mysql_source;selectdistinct id from students_mysql_source;
5.5 Windowing TVFs(现在推荐使用)

目前提供了三类TVFs窗口操作:TUMBLE、HOP、CUMULATE

会话SESSION窗口只能通过GROUP WINDOW FUNCTION实现(老版本实现,新版本还没有

计数窗口在FLINK SQL中暂未支持

5.5.1 Tumble

会给bid表增加三个窗口列:window_start、window_end、window_time

需要设置一个滚动时间

每隔一段时间会触发一次窗口的统计

-- 创建Bid订单表droptableifexists bid_kafka_source;CREATETABLEifnotexists bid_kafka_source (`item` STRING,`price`DOUBLE,`bidtime`TIMESTAMP(3),`proc_time`as PROCTIME(),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
   WATERMARK FOR bidtime AS bidtime
)WITH('connector'='kafka','topic'='bid','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 准备数据
C,4.00,2020-04-1508:05:00
C,4.00,2020-04-1508:06:00
C,4.00,2020-04-1508:07:00
D,5.00,2020-04-1508:09:00
B,3.00,2020-04-1508:11:00
E,1.00,2020-04-1508:13:00
F,6.00,2020-04-1508:17:00
F,6.00,2020-04-1508:20:00-- 创建Kafka Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic bid-- 生产数据
kafka-console-producer.sh --broker-list master:9092 --topic bid-- 基于事件时间的滚动窗口,每隔十分钟事件时间便会执行一次SELECT window_start,window_end,sum(price)as sum_price
FROMTABLE(-- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可
    TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))groupby window_start,window_end
;-- 基于处理时间的滚动窗口,每隔10秒的处理时间便会执行一次SELECT window_start,window_end,sum(price)as sum_price
FROMTABLE(-- tumble函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可
    TUMBLE(TABLE bid_kafka_source, DESCRIPTOR(proc_time),INTERVAL'10' SECONDS))groupby window_start,window_end
;

基于处理时间的滚动窗口,在时间段内有数据输入程序才会执行。

在这里插入图片描述

5.5.2 HOP

批处理中的窗口不同,有数据来时,才会进行滑动、滚动,根据这个区间进行数据的统计

滑动窗口

需要指定两个时间:滑动的时间、窗口的大小

-- 基于事件时间的滑动窗口SELECT window_start,window_end,sum(price)as sum_price
FROMTABLE(-- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可
    HOP(TABLE bid_kafka_source, DESCRIPTOR(bidtime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES))groupby window_start,window_end
;-- 基于处理时间的滑动窗口SELECT window_start,window_end,sum(price)as sum_price
FROMTABLE(-- HOP函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可
    HOP(TABLE bid_kafka_source, DESCRIPTOR(proc_time),INTERVAL'5' SECONDS,INTERVAL'10' SECONDS))groupby window_start,window_end
;

基于事件时间的滑动窗口
在这里插入图片描述
在这里插入图片描述

基于处理时间的滑动窗口:滑动大小为5秒,窗口大小为10秒
在这里插入图片描述
在这里插入图片描述

5.5.3 CUMULATE

累积窗口:首先会按照步长初始化一个窗口大小,然后按照步长的间隔时间触发窗口的统计,接下来窗口大小会不断增大,直到达到设置的最大size,然后重复这个过程

需要指定两个时间间隔:步长、最大的size

例如:步长为2分钟,size为10分钟

每隔2分钟会触发一次统计,第一次统计的最近两分钟的数据,第二次统计是最近4分钟的…第5次统计是最近10分钟的数据,第6次统计是最近2分钟的数据…

-- 基于事件时间的累计窗口SELECT window_start,window_end,sum(price)as sum_price
FROMTABLE(-- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可
    CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(bidtime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES))groupby window_start,window_end
;-- 基于处理时间的累计窗口SELECT window_start,window_end,sum(price)as sum_price
FROMTABLE(-- CUMULATE函数 会给bid表增加三个窗口列:window_start、window_end、window_time-- 如果需要基于窗口的统计则按照窗口列分组即可
    CUMULATE(TABLE bid_kafka_source, DESCRIPTOR(proc_time),INTERVAL'2' SECONDS,INTERVAL'10' SECONDS))groupby window_start,window_end
;
5.5.4 SESSION

会话窗口,目前版本不支持TVFs写法

需要使用老版本的写法:GROUP WINDOW FUNCTION

间隔一段时间没有数据就会触发窗口的统计

-- 基于事件时间的会话窗口select session_start(bidtime,INTERVAL'2' MINUTES)as session_start
       ,session_end(bidtime,INTERVAL'2' MINUTES)as session_end
       ,sum(price)as sum_price
from bid_kafka_source
groupbysession(bidtime,INTERVAL'2' MINUTES);-- 基于处理时间的会话窗口select session_start(proc_time,INTERVAL'2' SECONDS)as session_start
       ,session_end(proc_time,INTERVAL'2' SECONDS)as session_end
       ,sum(price)as sum_price
from bid_kafka_source
groupbysession(proc_time,INTERVAL'2' SECONDS);
5.6 Over聚合(注意与hivesql中的用法有所不同)
5.6.1 聚合类

sum、max、min、count、avg

sum 比较特殊:如果指定了order By,则表示累加求和,不指定则表示整个窗口求和

max、min、count、avg 不需要指定order By

-- 准备数据
item,supply_id,price,bidtime
A,001,4.00,2020-04-1508:05:00
A,002,2.00,2020-04-1508:06:00
A,001,5.00,2020-04-1508:07:00
B,002,3.00,2020-04-1508:08:00
A,001,1.00,2020-04-1508:09:00
A,002,6.00,2020-04-1508:10:00
B,001,6.00,2020-04-1508:11:00
A,001,6.00,2020-04-1508:12:00
B,002,6.00,2020-04-1508:13:00
B,002,6.00,2020-04-1508:14:00
A,001,66.00,2020-04-1508:18:00
B,001,7.00,2020-04-1508:16:00
B,001,7.00,2020-04-1508:17:00-- 创建order订单表droptableifexists order_kafka_source;CREATETABLEifnotexists order_kafka_source (`item` STRING,`supply_id` STRING,`price`DOUBLE,`bidtime`TIMESTAMP(3),-- 指定水位线前移策略,并同时声明数据中的哪一列是事件时间
   WATERMARK FOR bidtime AS bidtime
)WITH('connector'='kafka','topic'='order','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 创建Kafka Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic order-- 生产数据
kafka-console-producer.sh --broker-list master:9092 --topic order-- 聚合类函数在实时的Over窗口上只会产生追加的数据,没有更新-- 最终需要维护的状态大小同partition by指定的字段有关-- 1、统计每种商品的累计成交金额select item
       -- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算-- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合,sum(price)over(partitionby item orderby bidtime)as sum_price
from order_kafka_source
;-- 2、统计每种商品的最大成交金额select item
       -- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计-- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值,max(price)over(partitionby item orderby bidtime)as max_price
from order_kafka_source
;-- 3、统计每种商品的最小、平均成交金额/交易次数 同上select item
       -- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与统计-- 来一条数据就会输出一条数据,max会将截止到当前时间戳最大的数据中取最大的一个值,min(price)over(partitionby item orderby bidtime)as max_price
from order_kafka_source
;-- 4、统计最近10分钟内每种商品的累计成交金额,每隔事件时间的十分钟统计一次select item
       -- 必须指定order by ,而且必须使用时间列升序,-- 如果本条数据的时间小于上一条数据的时间,则本条数据会被忽略,不参与计算-- 相当于sum只能做累加求和,来一条数据累加一次,无法做全局聚合,sum(price)over(partitionby item 
           orderby bidtime 
           -- 每次统计时 只会将和当前数据的时间相差10分钟内的数据进行统计
           RANGE BETWEENINTERVAL'10'MINUTEPRECEDINGANDCURRENTROW-- ROWS BETWEEN 10 PRECEDING AND CURRENT ROW -- 统计最近10条)as sum_price
from order_kafka_source
;
5.6.2 排名类

组内排名

row_number、rank、dense_rank

-- 1、统计每种商品成交时间的排名select item
       ,price
       -- 对item进行分组,同上面的聚合类over一样,order by必须指定时间,而且必须时升序,row_number()over(partitionby item orderby bidtime)as rn
from order_kafka_source
;-- 2、统计每种商品成交金额的排名, 无法统计所有数据的排名,代价太大,所以只能做TopN-- 统计每种商品成交金额的排名Top3select t1.item
       ,t1.price
       ,t1.rn
from(select item
         ,price
         ,row_number()over(partitionby item orderby price desc)as rn
  from order_kafka_source
) t1 where t1.rn <=3;
5.7 Order By

全局排序,注意代价

select*from order_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/-- 如果直接基于非时间列排序是不被支持的-- order by price desc 只有price是不行的,必须得带上时间orderby bidtime,price desc;-- 会保留排序最大的前10条数据,但是这10条数据并不是按序输出的select*from order_kafka_source /*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/orderby price desc-- 还可以通过limit来限制代价limit10;
5.8 Limit

用于限制返回的条数

在实时场景中一般结合Order By来降低排序的代价

5.9 Join(重点、难懂)

了解清楚,各个join所适用的场景,面试题

join分为:

内连接、外连接、全连接

外连接分为:左外连接、右外连接

union区别于join,为上下连接

5.9.1 Regular Join

更偏向于非实时或批量处理,Regular Join在实时处理过程中需要维护大量的状态信息,从而增加内存和存储的压力。

常规Join,同HiveSQL、SparkSQL一致

可以进行:inner join、left join、right join、full join

注意状态的大小,可以设置TTL(超过TTL,两张表便无法再进行数据的连接

droptableifexists students_join;CREATETABLEifnotexists students_join (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,
   proc_time AS PROCTIME())WITH('connector'='kafka','topic'='students_join','properties.bootstrap.servers'='master:9092',-- 读数据时,若没有grp1,则会自动创建'properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');droptableifexists score_join;CREATETABLEifnotexists score_join (`id`BIGINT,`subject_id`BIGINT,`score`INT,
   proc_time AS PROCTIME())WITH('connector'='kafka','topic'='score_join','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 进行join-- 用法同离线join没有任何区别-- 但是要注意:一个流中的数据会一直等待另一个流中的数据达到,意味着状态会一直变大,最终任务肯定会失败-- 可以在join的时候指定状态的过期时间TTL,这样状态不会无限制的变大-- 设置TTLset'table.exec.state.ttl'='100000ms';select t1.id
       ,t1.name
       ,t2.subject_id
       ,t2.score
       -- join没有限制,可以实现内/外joinfrom students_join t1 leftjoin score_join t2 on t1.id = t2.id;-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic students_join
kafka-console-producer.sh --broker-list master:9092 --topic score_join
5.9.2 Interval Join

时间间隔关联

在Regular Join的基础之上指定一个时间间隔

实际上也是通过时间间隔来让状态不会一直变大,类似TTL;超过时间间隔则数据不会进行join

select t1.id
       ,t1.name
       ,t2.subject_id
       ,t2.score
       -- join没有限制,可以实现内/外joinfrom students_join t1 
leftjoin score_join t2 
on t1.id = t2.id
and t1.proc_time BETWEEN t2.proc_time -INTERVAL'10' SECONDS AND t2.proc_time
;
5.9.3 Temporal Join

时态表关联

适用于流表关联时态表

时态表:随着时间一直变化

001,2,DOL,2024-08-0511:20:20001,3,DOL,2024-08-0511:25:20001,5,DOL,2024-08-0511:30:20001,6,DOL,2024-08-0511:45:20001,4,DOL,2024-08-0512:15:20-- 构建订单表,流表droptableifexists orders_join;CREATETABLEifnotexists orders_join (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
)WITH('connector'='kafka','topic'='orders_join','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 构建货币汇率表,时态表-- 会随着时间一直变化droptableifexists currency_rates;CREATETABLEifnotexists currency_rates(
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time,PRIMARYKEY(currency)NOT ENFORCED
)WITH('connector'='kafka','topic'='currency_rates','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='canal-json');-- 准备数据
{"data":[{"currency":"DOL","conversion_rate":7.14,"update_time":"2024-08-05 10:30:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.24,"update_time":"2024-08-05 11:25:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.04,"update_time":"2024-08-05 12:00:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.34,"update_time":"2024-08-05 12:14:00"}],"type":"INSERT"}
{"data":[{"currency":"DOL","conversion_rate":7.14,"update_time":"2024-08-05 12:20:00"}],"type":"INSERT"}

-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic orders_join
kafka-console-producer.sh --broker-list master:9092 --topic currency_rates-- 时态表Join,订单表中的数据会与汇率表中的小于且距离订单表中的数据最近的数据进行join-- 注:若汇率表中没有大于当前订单表中最新的数据的时间的数据,那么汇率表中没有与这个订单表中的这条最新的数据相join的数据select t1.order_id
       ,t1.currency
       ,t1.price
       ,t1.order_time
       ,t2.conversion_rate
       ,t2.update_time
       ,t1.price * t2.conversion_rate as price_rmb
from orders_join t1 
-- 通过FOR SYSTEM_TIME 来表示进行时态JOINleftjoin currency_rates FOR SYSTEM_TIME ASOF t1.order_time t2
on t1.currency = t2.currency
;
5.9.4 LookUp Join

适用于流表关联维表

维表:存储维度数据,通常变化频率不是很高

-- MySQL直接作为Source ---> 有界流(任务执行完便结束)-- MySQL的学生信息表作为维表droptableifexists students_mysql_join;CREATETABLEifnotexists students_mysql_join (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,PRIMARYKEY(id)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/bigdata30?useSSL=false','table-name'='students','username'='root','password'='123456',-- 设置LookUp缓存的条数以及过期时间'lookup.cache.max-rows'='1000','lookup.cache.ttl'='60s');-- 使用Regular Join中的score_join作为事实表-- 如果MySQL有数据更新,程序不会识别到,因为MySQL的数据只会加载一次,有数据变更时需要重启任务select t1.id
       ,t1.score
       ,t2.name
       ,t2.clazz
from score_join t1
leftjoin students_mysql_join t2
on t1.id = t2.id
;-- 使用Lookup Joinselect t1.id
       ,t1.score
       ,t2.name
       ,t2.clazz
from score_join t1
-- 来一数据就会去MySQL中查询一次,立马能够识别到更新的数据-- 对MySQL的性能影响较大-- 下述代码与Temporal Join中的一致leftjoin students_mysql_join FOR SYSTEM_TIME ASOF t1.proc_time t2
on t1.id = t2.id
;
5.10 模式匹配(CEP)(难懂)

CEP(Complex Event Processing,复杂事件处理),模式匹配是处理方法

语法:

SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (PARTITIONBY userid
ORDERBY proctime
MEASURES -- 相当于select
A.id AS aid,
B.id AS bid,
C.id AS cid
-- 定义A B C三个规则,默认每个规则只需要匹配1次即可-- 当所有的规则都满足,则输出数据
PATTERN (A B C) 
DEFINE -- 定义具体规则
A AS name ='a',
B AS name ='b',
C AS name ='c')AS T
5.10.1 欺诈检测
  • 实时监控某个账户的交易流水,如果出现一笔交易小于1,紧接着下一笔交易大于500,那么就输出一个警告
-- 创建一个交易流水表droptableifexists trans;CREATETABLEifnotexists trans (`id` STRING,`price`DOUBLE,
   proc_time AS PROCTIME())WITH('connector'='kafka','topic'='trans','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 准备数据001,2001,600001,0.1001,200001,700001,0.8001,600001,0.7001,400-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic trans-- 进行模式匹配SELECT T.min_price,T.max_price,T.a_proc_time,T.b_proc_time
FROM trans
    MATCH_RECOGNIZE (PARTITIONBY id -- 按什么分组统计ORDERBY proc_time -- 按时间升序排列数据
      MEASURES -- 相当于select
       A.price as min_price
       ,A.proc_time as a_proc_time
       ,B.price as max_price
       ,B.proc_time as b_proc_time
      -- 定义A B 两个规则,当所有的规则都满足,则输出数据
      PATTERN (A B) 
      DEFINE -- 定义具体规则
        A AS price <1,
        B AS price >500)AS T
;
  • 实时监控某个账户的交易流水,如果出现三笔交易小于1,紧接着下一笔交易大于500,那么就输出一个警告
-- 进行模式匹配SELECT*FROM trans
    MATCH_RECOGNIZE (PARTITIONBY id -- 按什么分组统计ORDERBY proc_time -- 按时间升序排列数据
      MEASURES -- 相当于select
       A.price as last_price -- 默认取出来的是最后一条A的记录,avg(A.price)as avg_price
       ,A.proc_time as a_proc_time
       ,B.price as max_price
       ,B.proc_time as b_proc_time
      -- 定义A B 两个规则,当所有的规则都满足,则输出数据-- A{3} 表示需要匹配3次A的规则,才能进行B规则的匹配
      PATTERN (A{3} B) 
      DEFINE -- 定义具体规则
        A AS price <1,
        B AS price >500)AS T
;001,0.1001,0.2001,0.4001,600
  • 实时监控某个账户的交易流水,如果出现一笔交易小于1,紧接着下一笔交易大于500,两笔交易时间差如果小于5s钟,那么就输出一个警告
SELECT T.min_price,T.max_price,T.a_proc_time,T.b_proc_time
FROM trans
    MATCH_RECOGNIZE (PARTITIONBY id -- 按什么分组统计ORDERBY proc_time -- 按时间升序排列数据
      MEASURES -- 相当于select
       A.price as min_price
       ,A.proc_time as a_proc_time
       ,B.price as max_price
       ,B.proc_time as b_proc_time
      -- 定义A B 两个规则,当所有的规则都满足,则输出数据
      PATTERN (A B)WITHININTERVAL'5'SECOND
      DEFINE -- 定义具体规则
        A AS price <1,
        B AS price >500)AS T
;001,0.1001,600001,0.1-- 等待5s001,600
5.10.2 股票检测
  • 实时监控股票价格走势,找出连续下降的区间
droptableifexists symbol;CREATETABLEifnotexists symbol (`symbol` STRING,`rowtime`TIMESTAMP(3),`price`DECIMAL(10,2),
   WATERMARK FOR rowtime AS rowtime
)WITH('connector'='kafka','topic'='symbol','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='earliest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');-- 准备数据
ACME,2024-08-0510:00:00,17
ACME,2024-08-0510:20:00,18
ACME,2024-08-0510:40:00,20
ACME,2024-08-0511:00:00,21
ACME,2024-08-0511:20:00,22
ACME,2024-08-0511:40:00,20
ACME,2024-08-0512:00:00,15
ACME,2024-08-0512:20:00,14
ACME,2024-08-0512:40:00,13
ACME,2024-08-0513:00:00,16
ACME,2024-08-0513:20:00,19-- 创建Topic
kafka-topics.sh --zookeeper master:2181/kafka --create --replication-factor 1 --partitions 1 --topic symbol-- 向Kafka生产数据
kafka-console-producer.sh --broker-list master:9092 --topic symbol-- 进行模式匹配SELECT*FROM symbol
    MATCH_RECOGNIZE (PARTITIONBY symbol -- 按什么分组统计ORDERBY rowtime -- 按时间升序排列数据
      MEASURES -- 相当于select
         A.price as a_price
         ,A.rowtime as a_rowtime
         ,max(B.price)as max_price
         ,min(B.price)as min_price
         ,min(B.rowtime)as start_time
         ,max(B.rowtime)as end_time
         ,C.price as c_price
         ,C.rowtime as c_rowtime
      AFTERMATCH SKIP PAST LASTROW-- 定义A B C三个规则,当所有的规则都满足,则输出数据-- B+ 表示至少要符合一次
      PATTERN (A B{2,} C) 
      DEFINE -- 定义具体规则-- 如果B是顶点数据,那么往前取一条B匹配到的数据是取不到的,则返回null-- 如果B是下降区间的数据,那么往前取一条B匹配到的数据是可以取到数据的
        B as(LAST(B.price,1)> B.price)or(LAST(B.price,1)isnulland B.price > A.price),C asLAST(B.price)< C.price
    )AS T
;

6、整合Hive

获取解析Hive的元数据

Flink整合Hive主要有两个目的:

1、将Flink本身的元数据借助Hive保存

2、可以加载Hive中的数据,正常一般优先通过Spark处理Hive的数据

6.1 准备工作
# 1、下载Hive的connector依赖
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.15.4/flink-sql-connector-hive-3.1.2_2.12-1.15.4.jar

# 2、上传到FLINK的lib目录下# 3、重启yarn-session以及sql客户端
6.2 创建Catalog

Catalog --> 库 --> 表 --> 数据

-- flinksql中的操作,操作后flinksql端可获得hive中的CATALOG,亦可查询其中的表数据-- 使用catalog之前需要先启动hive的metastore服务-- hive --service metastoreCREATE CATALOG myhive WITH('type'='hive','default-database'='default','hive-conf-dir'='/usr/local/soft/hive-3.1.2/conf/');-- 查看CATALOGshow CATALOGs;-- 切换catalogUSE CATALOG myhive;-- hive中的操作createdatabase testdb;use testdb;droptableifexists students;CREATETABLEifnotexists students (`id` STRING,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING
)row format delimited
fieldsterminatedby',';loaddatalocal inpath "/usr/local/soft/bigdata30/students.txt"intotable students;
6.3 将FlinkSQL的表存入Hive

flink的SQL客户端默认会使用内存保存元数据Catalog,重启之后会丢失,需要重新创建

借助Hive的Catalog来保存Flink表的元数据,重启后还能保留

保存的元数据虽然能在hive中看到,但只能在Flink环境下使用

-- 将数据存储到从hive获取的 CATALOG中droptableifexists students_join;CREATETABLEifnotexists students_join (`id`BIGINT,`name` STRING,`age`INT,`gender` STRING,`clazz` STRING,
   proc_time AS PROCTIME())WITH('connector'='kafka','topic'='students_join','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');

kafka-console-producer.sh --broker-list master:9092 --topic students_join-- 重启后CREATE CATALOG myhive WITH('type'='hive','default-database'='default','hive-conf-dir'='/usr/local/soft/hive-3.1.2/conf/');-- 查看CATALOGshow CATALOGs;-- 切换catalogUSE CATALOG myhive;showdatabases;use testdb;-- 查询数据,/*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/ :从头开始读取select*from students_join /*+OPTIONS('scan.startup.mode' = 'earliest-offset')*/;
6.4 加载Hive的函数
LOAD MODULE hive WITH('hive-version'='3.1.2');select split('hello,world',',');droptableifexists words;CREATETABLEifnotexists words (
   line STRING
)WITH('connector'='kafka','topic'='words','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');select word
from words,
lateral table(explode(split(line,'#')))as t(word);

7、CheckPoint

7.1 准备任务

需要通过sql-client.sh -f 来执行

将下列SQL放到word_cnt.sql文件中

droptableifexists words;CREATETABLEifnotexists words (
   word STRING
)WITH('connector'='kafka','topic'='words','properties.bootstrap.servers'='master:9092','properties.group.id'='grp1','scan.startup.mode'='latest-offset','format'='csv',-- 是否忽略脏数据'csv.ignore-parse-errors'='true');droptableifexists word_cnt;CREATETABLEifnotexists word_cnt (
 word STRING,
 cnt BIGINT)WITH('connector'='print');insertinto word_cnt
select word
       ,count(*)as cnt
from words
groupby word
;

kafka-console-producer.sh --broker-list master:9092 --topic words
7.2 提交任务

第一次提交,不需要指定恢复的目录

sql-client.sh -f word_cnt.sql
7.3 故障之后的恢复
-- 1、先找到任务在HDFS保存的CK的路径-- state.checkpoints.dir: hdfs://master:9000/file/checkpoint:flink中checkpoint的保存路径,在flink-conf.yaml中/file/checkpoint/99d18855d54ed2a7e9670822307569f9/chk-25-- 2、在刚刚的word_cnt.sql文件中的insert语句前加入下面内容SET'execution.savepoint.path'='hdfs://master:9000/file/checkpoint/99d18855d54ed2a7e9670822307569f9/chk-25';-- 3、再次通过sql-client.sh提交任务sql-client.sh -f word_cnt.sql

在这里插入图片描述

8、优化

8.1 Source被多次使用
droptableifexists students_01;CREATETABLEifnotexists students_01(
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
)WITH('connector'='kafka','topic'='students_01',-- 指定topic'properties.bootstrap.servers'='master:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv',-- 指定数据的格式'csv.field-delimiter'=',',-- 指定分隔符'csv.ignore-parse-errors'='true'-- 跳过脏数据);-- 创建sink表CREATETABLE clazz_cnt (
    clazz STRING,
    cnt BIGINT)WITH('connector'='print');CREATETABLE gender_cnt (
    gender STRING,
    cnt BIGINT)WITH('connector'='print');CREATETABLE age_cnt (
    age INT,
    cnt BIGINT)WITH('connector'='print');-- 假设同一个source被使用多次-- 统计班级人数insertinto clazz_cnt
select clazz
       ,count(*)as cnt
from students_01
groupby clazz
;-- 统计性别人数insertinto gender_cnt
select gender
       ,count(*)as cnt
from students_01
groupby gender
;-- 统计年龄人数insertinto age_cnt
select age
       ,count(*)as cnt
from students_01
groupby age
;-- 每个insert都会提交一次job,最终会产生三个job-- 但每个job的source都一样,故可以进行合并-- 执行一组INSERT,最终只会生成一个JobEXECUTE STATEMENT SETBEGIN-- 统计班级人数insertinto clazz_cnt
  select clazz
       ,count(*)as cnt
  from students_01
  groupby clazz
  ;-- 统计性别人数insertinto gender_cnt
  select gender
       ,count(*)as cnt
  from students_01
  groupby gender
  ;-- 统计年龄人数insertinto age_cnt
  select age
       ,count(*)as cnt
  from students_01
  groupby age
  ;END;
8.2 反压

下游任务处理数据的速度 无法跟上 上游Source接收数据的速度

  • 准备数据-- 创建datagen source表droptableifexists words_datagen;CREATETABLE words_datagen ( word STRING)WITH('connector'='datagen','rows-per-second'='50000',-- 指定每秒生成的数据量'fields.word.length'='5'-- 生成word字段的值时,每个单词(或字符串)应该恰好包含5个字符。);droptableifexists blackhole_table;CREATETABLE blackhole_table ( word STRING, cnt BIGINT)WITH('connector'='blackhole');
8.2.1 数据量太大
insertinto blackhole_table
select 
    word,count(1)as cnt
from 
    words_datagen /*+ OPTIONS('rows-per-second'='50000') */groupby 
    word;-- 开启微批处理set'table.exec.mini-batch.enabled'='true';set'table.exec.mini-batch.allow-latency'='5 s';set'table.exec.mini-batch.size'='100000';-- 开启预聚合set'table.optimizer.agg-phase-strategy'='TWO_PHASE';
8.2.2 状态过大

CK消耗的时间过大

insertinto blackhole_table
select 
    word,count(1)as cnt
from 
    words_datagen /*+ OPTIONS('fields.word.length'='6') */groupby 
    word;-- 增加资源-- 1、增加并行度SET'parallelism.default'='8';-- 2、增加TM内存-- 修改配置文件
taskmanager.memory.process.size: 5000m
-- 通过命令提交时,可以通过参数指定: -ytm,--yarntaskManagerMemory
flink run -ytm 5000m
标签: flink 学习 sql

本文转载自: https://blog.csdn.net/m0_58050808/article/details/140966941
版权归原作者 灌木丛中的微风 所有, 如有侵权,请联系我们删除。

“Flink学习之Flink SQL(补)”的评论:

还没有评论