Flink SQL
1、Sql命令行
1、使用方式
-- 1、启动一个flink集群,独立集群,yarn-session模式
yarn-session.sh -d
-- 2、启动sql命令行sql-client.sh
-- 3、再流上定义表-- 再flink中创建表相当于创建一个视图(视图中不存数据,只有查询视图时才会去原表中读取数据)CREATETABLE abc (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='abc','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv');-- 4、查询数据(连续查询)select clazz,count(1)as c from
students
groupby clazz;
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic abc1500100001,施笑槐,22,女,文科十班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100004,葛德曜,24,男,理科三班
1500100005,宣谷芹,22,女,理科五班
2、输出结果模式
-- 表格模式(默认)(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来SET'sql-client.execution.result-mode'='table';-- 变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。SET'sql-client.execution.result-mode'='changelog';-- Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):SET'sql-client.execution.result-mode'='tableau';
2、SQL流批一体
1、流处理
1、流处理模式可以用于处理有界流和无界流
2、流处理模式输出连续结果
3、流处理模式底层十持续流模型,上游task和下游task同时启动等待数据到达
SET'execution.runtime-mode'='streaming';
2、批处理
1、只能用于处理有界流
2、输出最终结果
3、批处理模式底层十mr模型,先执行上游task再执行下游task,会再map端对数据进行预聚合
SET'execution.runtime-mode'='batch';-- 创建一个有界流的表CREATETABLE students_hdfs (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/spark/stu/students.txt',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);select clazz,count(1)as c from
students_hdfs
groupby clazz
3、Flink SQL连接器
1、kafka
1、kafka source
-- 创建kafka 表CREATETABLE students_kafka (`offset`BIGINT METADATA VIRTUAL,-- 偏移量`event_time`TIMESTAMP(3) METADATA FROM'timestamp',--数据进入kafka的时间,可以当作事件时间使用
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);
2、kafka sink
-- 创建kafka 表CREATETABLE students_kafka_sink (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students_sink',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);-- 将查询结果保存到kafka中insertinto students_kafka_sink
select*from students_hdfs;
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink
3、将更新的流写入kafka
CREATETABLE clazz_num_kafka (
clazz STRING,
num BIGINT)WITH('connector'='kafka','topic'='clazz_num',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='canal-json'-- 读取数据的格式);-- 将更新的数据写入kafka需要使用canal-json格式,数据中会带上操作类型
{"data":[{"clazz":"文科一班","num":71}],"type":"INSERT"}
{"data":[{"clazz":"理科三班","num":67}],"type":"DELETE"}
insertinto clazz_num_kafka
select clazz,count(1)as num from
students
groupby clazz;
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
2、hdfs
1、hdfs source
flink读取文件可以使用有界流方式,也可以使用无界流方式
-- 有界流CREATETABLE students_hdfs_batch (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/student',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);select*from students_hdfs_batch;-- 无界流-- 基于hdfs做流处理,读取数据是以文件为单位,延迟比kafka大CREATETABLE students_hdfs_stream (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/student',-- 必选:指定路径'format'='csv',-- 必选:文件系统连接器指定 format'source.monitor-interval'='5000'-- 每隔一段时间扫描目录,生成一个无界流);select*from students_hdfs_stream;
2、hdfs sink
-- 1、批处理模式(使用方式和底层原理和hive类似)SET'execution.runtime-mode'='batch';-- 创建表CREATETABLE clazz_num_hdfs (
clazz STRING,
num BIGINT)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/clazz_num',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);-- 将查询结果保存到表中insertinto clazz_num_hdfs
select clazz,count(1)as num
from students_hdfs_batch
groupby clazz;-- 2、流处理模式SET'execution.runtime-mode'='streaming';-- 创建表,如果查询数据返回的十更新更改的流需要使用canal-json格式CREATETABLE clazz_num_hdfs_canal_json (
clazz STRING,
num BIGINT)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/clazz_num_canal_json',-- 必选:指定路径'format'='canal-json'-- 必选:文件系统连接器指定 format);insertinto clazz_num_hdfs_canal_json
select clazz,count(1)as num
from students_hdfs_stream
groupby clazz;
3、MySQL
1、整合
# 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.49.jar
# 2、需要重启flink集群yarn application -kill[appid]
yarn-session.sh -d# 3、重新进入sql命令行
sql-client.sh
2、mysql source
-- 有界流-- flink中表的字段类型和字段名需要和mysql保持一致CREATETABLE students_jdbc (
id BIGINT,
name STRING,
age BIGINT,
gender STRING,
clazz STRING,PRIMARYKEY(id)NOT ENFORCED -- 主键)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/student','table-name'='students','username'='root','password'='123456');select*from students_jdbc limit10;
3、mysql sink
-- 创建kafka 表CREATETABLE students_kafka (`offset`BIGINT METADATA VIRTUAL,-- 偏移量`event_time`TIMESTAMP(3) METADATA FROM'timestamp',--数据进入kafka的时间,可以当作事件时间使用
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);-- 创建mysql sink表CREATETABLE clazz_num_mysql (
clazz STRING,
num BIGINT,PRIMARYKEY(clazz)NOT ENFORCED -- 主键)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/student','table-name'='clazz_num','username'='root','password'='123456');--- 再mysql创建接收表CREATETABLE clazz_num (
clazz varchar(10),
num BIGINT,PRIMARYKEY(clazz)-- 主键);-- 将sql查询结果实时写入mysql-- 将更新更改的流写入mysql,flink会自动按照主键更新数据insertinto clazz_num_mysql
select
clazz,count(1)as num from
students_kafka
groupby clazz;
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students1500100001,施笑槐,22,女,文科六班
4、DataGen
用于生成随机数据,一般用在高性能测试上
-- 创建包(只能用于source表)CREATETABLE students_datagen (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='datagen','rows-per-second'='5',-- 每秒随机生成的数据量'fields.age.min'='1','fields.age.max'='100','fields.sid.length'='10','fields.name.length'='2','fields.sex.length'='1','fields.clazz.length'='4');
5、print
用于高性能测试
只能用于sink表
CREATETABLE print_table (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='print');insertinto print_table
select*from students_datagen;
6、BlackHole
用于高性能测试
CREATETABLE blackhole_table (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='blackhole');insertinto blackhole_table
select*from students_datagen;
4、SQL语法
1、Hints
提示执行,在flink中可以用于动态修改表的属性,在spark中可以用于广播表
CREATETABLE students_kafka (`offset`BIGINT METADATA VIRTUAL,-- 偏移量`event_time`TIMESTAMP(3) METADATA FROM'timestamp',--数据进入kafka的时间,可以当作事件时间使用
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);-- 动态修改表属性,可以在查询数据时修改读取kafka数据的位置,不需要重新创建表select*from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;-- 有界流CREATETABLE students_hdfs (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/student',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);-- 可以在查询hdfs时,动态改成无界流select*from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' ) */;
2、WITH
-- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用with tmp as(select*from students_hdfs
/*+ OPTIONS('source.monitor-interval' = '5000' ) */where clazz='文科一班')select*from tmp
unionallselect*from tmp;
3、DISTINCT
在flink 的流处理中,使用distinct,flink需要将之前的数据保存在状态中,如果数据一直增加,状态会越来越大
状态越来越大,checkpoint时间会增加,最终会导致flink任务出问题
selectcount(distinct sid)from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;selectcount(sid)from(selectdistinct*from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */);
4、窗口函数(TVFs)
1、创建表
-- 创建kafka 表CREATETABLE bid (
bidtime TIMESTAMP(3),
price DECIMAL(10,2),
item STRING,
WATERMARK FOR bidtime AS bidtime
)WITH('connector'='kafka','topic'='bid',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid2020-04-1508:05:00,4.00,C
2020-04-1508:07:00,2.00,A
2020-04-1508:09:00,5.00,D
2020-04-1508:11:00,3.00,B
2020-04-1508:13:00,1.00,E
2020-04-1508:17:00,6.00,F
2、滚动窗口
1、事件时间
-- TUMBLE: 滚动窗口函数,函数的作用时在原表的基础上增加[窗口开始时间,窗口结束时间,窗口时间]-- TABLE;表函数,将里面函数的结果转换成动态表SELECT*FROMTABLE(
TUMBLE(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES));-- 在基于窗口函数提供的字段进行聚合计算-- 实时统计每隔商品的总的金额,每隔10分钟统计一次SELECT
item,
window_start,
window_end,sum(price)as sum_price
FROMTABLE(-- 滚动的事件时间窗口
TUMBLE(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))groupby item,window_start,window_end;
2、处理时间
CREATETABLE words (
word STRING,
proctime as PROCTIME()-- 定义处理时间,PROCTIME:获取处理时间的函数)WITH('connector'='kafka','topic'='words',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
java
spark
-- 在flink SQL中处理时间和事件时间的sql语法没有区别SELECT*FROMTABLE(
TUMBLE(TABLE words, DESCRIPTOR(proctime),INTERVAL'5'SECOND));SELECT
word,window_start,window_end,count(1)as c
FROMTABLE(
TUMBLE(TABLE words, DESCRIPTOR(proctime),INTERVAL'5'SECOND))groupby
word,window_start,window_end
3、滑动窗口
-- HOP: 滑动窗口函数-- 滑动窗口一条数据可能会落到多个窗口中SELECT*FROMTABLE(
HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES));-- 每隔5分钟计算最近10分钟所有商品总的金额SELECT
window_start,
window_end,sum(price)as sum_price
FROMTABLE(
HOP(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'5' MINUTES,INTERVAL'10' MINUTES))groupby
window_start,window_end
4、会话窗口
CREATETABLE words (
word STRING,
proctime as PROCTIME()-- 定义处理时间,PROCTIME:获取处理时间的函数)WITH('connector'='kafka','topic'='words',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic words
java
spark
select
word,
SESSION_START(proctime,INTERVAL'5'SECOND)as window_start,
SESSION_END(proctime,INTERVAL'5'SECOND)as window_end,count(1)as c
from
words
groupby
word,SESSION(proctime,INTERVAL'5'SECOND);
5、OVER聚合
1、批处理
在flink批处理模式下,over函数和hive是一样的
SET'execution.runtime-mode'='batch';-- 有界流CREATETABLE students_hdfs_batch (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/student',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);-- row_number,sum,count,avg,lag,lead,max,min-- 获取每隔班级年龄最大的前两个学生select*from(select*,
row_number()over(partitionby clazz orderby age desc)as r
from
students_hdfs_batch
)as a
where r <=2
2、流处理
flink流处理中over聚合使用限制
1、order by 字段必须是时间字段升序排序或者使用over_number时可以增加条件过滤
SET'execution.runtime-mode'='streaming';-- 创建kafka 表CREATETABLE students_kafka (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING,
proctime as PROCTIME())WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);-- 在流处理模式下,flink只能按照时间字段进行升序排序-- 如果按照一个普通字段进行排序,在流处理模式下,每来一条新的数据都需重新计算之前的顺序,计算代价太大-- 在row_number基础上增加条件,可以限制计算的代价不断增加select*from(select*,
row_number()over(partitionby clazz orderby age desc)as r
from
students_kafka
)where r <=2;-- 在流处理模式下,flink只能按照时间字段进行升序排序select*,sum(age)over(partitionby clazz orderby proctime)from
students_kafka
-- 时间边界-- RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW select*,sum(age)over(partitionby clazz
orderby proctime
-- 统计最近10秒的数据
RANGE BETWEENINTERVAL'10'SECONDPRECEDINGANDCURRENTROW)from
students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;-- 数据边界--ROWS BETWEEN 10 PRECEDING AND CURRENT ROWselect*,sum(age)over(partitionby clazz
orderby proctime
-- 统计最近10秒的数据ROWSBETWEEN2PRECEDINGANDCURRENTROW)from
students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */;
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students1500100003,单乐蕊,22,女,理科六班
6、ORDER BY
-- 排序字段必须带上时间升序排序select*from
students_kafka
orderby proctime,age;-- 限制排序的计算代价select*from
students_kafka
orderby age
limit10;
7、row_number去重
CREATETABLE students_kafka (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING,
proctime as PROCTIME())WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students1500100003,单乐蕊,22,女,理科六班
select*from(select
sid,name,age,
row_number()over(partitionby sid orderby proctime)as r
from students_kafka /*+ OPTIONS('scan.startup.mode' = 'latest-offset') */)where r =1;
8、JOIN
Regular Joins: 主要用于批处理,如果在流处理上使用,状态会越来越大
Interval Join: 主要用于双流join
Temporal Joins:用于流表关联时态表(不同时间状态不一样,比如汇率表)
Lookup Join:用于流表关联维表(不怎么变化的表)
1、Regular Joins
常规join,和hive spark sql的join是一样的
1、批处理模式
CREATETABLE students_hdfs_batch (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/student',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);CREATETABLE score_hdfs_batch (
sid STRING,
cid STRING,
score INT)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/score',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);SET'execution.runtime-mode'='batch';-- inner joinselect a.sid,a.name,b.score from
students_hdfs_batch as a
innerjoin
score_hdfs_batch as b
on a.sid=b.sid;-- left joinselect a.sid,a.name,b.score from
students_hdfs_batch as a
leftjoin
score_hdfs_batch as b
on a.sid=b.sid;-- full joinselect a.sid,a.name,b.score from
students_hdfs_batch as a
fulljoin
score_hdfs_batch as b
on a.sid=b.sid;
2、流处理模式
CREATETABLE students_kafka (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv',-- 读取数据的格式'csv.ignore-parse-errors'='true'-- 如果数据解析异常自动跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
CREATETABLE score_kafka (
sid STRING,
cid STRING,
score INT)WITH('connector'='kafka','topic'='scores',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv',-- 读取数据的格式'csv.ignore-parse-errors'='true');
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores1500100001,1000001,981500100001,1000002,51500100001,1000003,137SET'execution.runtime-mode'='streaming';-- 使用常规关联方式做流处理,flink会将两个表的数据一直保存在状态中,状态会越来越大-- 可以设置状态有效期避免状态无限增大SET'table.exec.state.ttl'='5000';-- full joinselect a.sid,b.sid,a.name,b.score from
students_kafka as a
fulljoin
score_kafka as b
on a.sid=b.sid;
2、Interval Join
两个表在join时只关联一段时间内的数据,之前的数据就不需要保存在状态中,可以避免状态无限增大
CREATETABLE students_kafka_time (
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv',-- 读取数据的格式'csv.ignore-parse-errors'='true'-- 如果数据解析异常自动跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students1500100001,施笑槐,22,女,文科六班,2023-11-1017:10:101500100001,吕金鹏,24,男,文科六班,2023-11-1017:10:111500100001,单乐蕊,22,女,理科六班,2023-11-1017:10:12CREATETABLE score_kafka_time (
sid STRING,
cid STRING,
score INT,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='scores',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv',-- 读取数据的格式'csv.ignore-parse-errors'='true');
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores1500100001,1000001,98,2023-11-1017:10:091500100001,1000002,5,2023-11-1017:10:111500100001,1000003,137,2023-11-1017:10:12-- a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts-- a表数据的时间需要在b表数据的时间减去5秒到b表数据时间的范围内SELECT a.sid,b.sid,a.name,b.score
FROM students_kafka_time a, score_kafka_time b
WHERE a.sid = b.sid
AND a.ts BETWEEN b.ts -INTERVAL'5'SECONDAND b.ts
3、Temporal Joins
用于流表关联时态表,比如订单表和汇率表的关联
-- 订单表CREATETABLE orders (
order_id STRING,-- 订单编号
price DECIMAL(32,2),--订单金额
currency STRING,-- 汇率编号
order_time TIMESTAMP(3),-- 订单时间
WATERMARK FOR order_time AS order_time -- 水位线)WITH('connector'='kafka','topic'='orders',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders001,100,CN,2023-11-1109:48:10002,200,CN,2023-11-1109:48:11003,300,CN,2023-11-1109:48:14004,400,CN,2023-11-1109:48:16005,500,CN,2023-11-1109:48:18-- 汇率表CREATETABLE currency_rates (
currency STRING,-- 汇率编号
conversion_rate DECIMAL(32,2),-- 汇率
update_time TIMESTAMP(3),-- 汇率更新时间
WATERMARK FOR update_time AS update_time,-- 水位线PRIMARYKEY(currency)NOT ENFORCED -- 主键)WITH('connector'='kafka','topic'='currency_rates',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='canal-json'-- 读取数据的格式);insertinto currency_rates
values('CN',7.2,TIMESTAMP'2023-11-11 09:48:05'),('CN',7.1,TIMESTAMP'2023-11-11 09:48:10'),('CN',6.9,TIMESTAMP'2023-11-11 09:48:15'),('CN',7.4,TIMESTAMP'2023-11-11 09:48:20');
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates-- 如果使用常规关联方式,取的时最新的汇率,不是对应时间的汇率select a.order_id,b.*from
orders as a
leftjoin
currency_rates as b
on a.currency=b.currency;-- 时态表join-- FOR SYSTEM_TIME AS OF orders.order_time: 使用订单表的时间到汇率表中查询对应时间的数据SELECT
order_id,
price,
conversion_rate,
order_time
FROM orders
LEFTJOIN currency_rates FOR SYSTEM_TIME ASOF orders.order_time
ON orders.currency = currency_rates.currency;
4、lookup join
-- 学生表CREATETABLE students_jdbc (
id BIGINT,
name STRING,
age BIGINT,
gender STRING,
clazz STRING,PRIMARYKEY(id)NOT ENFORCED -- 主键)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/student','table-name'='students','username'='root','password'='123456','lookup.cache.max-rows'='1000',-- 缓存的最大行数'lookup.cache.ttl'='20000'-- 缓存过期时间);-- 分数表CREATETABLE score_kafka (
sid BIGINT,
cid STRING,
score INT,
proc_time as PROCTIME())WITH('connector'='kafka','topic'='scores',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='latest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv',-- 读取数据的格式'csv.ignore-parse-errors'='true');
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores1500100001,1000001,981500100001,1000002,51500100001,1000003,137-- 使用常规关联方式,关联维度表-- 1、任务在启动的时候会将维表加载到flink 的状态中,如果数据库中学生表更新了,flink不知道,关联不到最新的数据select
b.id,b.name,a.score
from
score_kafka as a
leftjoin
students_jdbc as b
on a.sid=b.id;-- lookup join -- FOR SYSTEM_TIME AS OF a.proc_time : 使用关联字段到维表中查询最新的数据-- 优点: 流表每来一条数据都会去mysql中查询,可以关联到最新的数据-- 每次查询mysql会降低性能select
b.id,b.name,a.score
from
score_kafka as a
leftjoin
students_jdbc FOR SYSTEM_TIME ASOF a.proc_time as b
on a.sid=b.id;
5、Flink SQL Checkpoint
checkpoiny可以定时将flink任务的状态持久化到hdfs中,任务执行失败重启可以保证中间结果不丢失
1、开启checkpoint
# 修改flink配置文件vim flink-conf.yaml
# checkppint 间隔时间
execution.checkpointing.interval: 1min
# 任务手动取消时保存checkpoint
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 同时允许1个checkpoint执行
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0# 数据处理的语义
execution.checkpointing.mode: EXACTLY_ONCE
# checkpoint超时时间
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 0
execution.checkpointing.unaligned: false# 状态后端(保存状态的位置,hashmap:内存)
state.backend: hashmap
# checkpoint路径
state.checkpoints.dir: hdfs://master:9000/flink/checkpoint
2、编写一个flink sql脚本
vim word_count.sql
-- 实时从kafka中读取单词,统计单词的数量,将结果保存到mysql中-- 1、创建source表CREATETABLE words (
word STRING
)WITH('connector'='kafka','topic'='words',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);-- 2、创建sink表CREATETABLE word_count (
word STRING,
num BIGINT,PRIMARYKEY(word)NOT ENFORCED -- 主键)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/student','table-name'='word_count',-- 需要手动到mysql中创建表'username'='root','password'='123456');-- 3、编写sql处理数据将结果保存到sink表中insertinto word_count
select
word,count(1)as num
from
words
groupby word;
3、使用sql-client -f 启动任务
sql-client.sh -f word_count.sql
4、任务失败重启
-- 1、获取checkpoint的路径/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23-- 2、再sql脚本中增加参数,增加到sql脚本的inwet into 的前面-- 指定任务会的checkpoint的地址SET'execution.savepoint.path'='hdfs://master:9000/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23';-- 3、启动sql任务sql-client.sh -f word_count.sql
6、反压
1、测试反压
反压:下游消费数据的速度比上游生成数据的速度小时会出现反压,下游导致上游task反压
CREATE TABLE words (
word STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='100000', -- 每秒随机生成的数据量
'fields.word.length'='4'
);
CREATE TABLE blackhole_table (
wprd STRING,
num BIGINT
) WITH (
'connector' = 'blackhole'
);
insert into blackhole_table
select
word,
count(1) as num from
words /*+ OPTIONS('rows-per-second'='1000000','fields.word.length'='5') */
group by word;
2、增加资源
--1、增加并行度,一个并行度对应一个slotSET'parallelism.default'='2';--2、增加内存-- 如果状态太大,内存放不下导致的反压可以通过增加内存解决-- -tm : taskmanager的内存-- -jm : jobmanager的内存
yarn-session.sh -tm 4G -d
3、微批和预聚合
开启微批处理和预聚合,可以减少shuffle过程中传输的数据量,减轻下游算子计算的压力
-- 开启微批处理set'table.exec.mini-batch.enabled'='true';-- 批次的时间set'table.exec.mini-batch.allow-latency'='5 s';-- 批次大小set'table.exec.mini-batch.size'='5000';-- 开启预聚合set'table.optimizer.agg-phase-strategy'='TWO_PHASE';
7、Flink整合hive
1、整合
# 1、将依赖包上传到flink的lib目录下
flink-sql-connector-hive-3.1.2_2.12-1.15.2.jar
# 2、重启flink集群yarn application -listyarn application -kill application_1699579932721_0003
yarn-session.sh -d# 3、重新进入sql命令行
sql-client.sh
2、hive catalog
catalog(元数据) —> database —> table —> 数据 — > 列
-- 1、开启hive的元数据服务
nohup hive --service metastore &-- 2、创建hive catalogCREATE CATALOG myhive WITH('type'='hive','hive-conf-dir'='/usr/local/soft/hive-3.1.2/conf');-- 查看所有的catalog-- default_catalog: 默认的元数据,将元数据保存在内存中show catalogs;--3、切换cataloguse catalog myhive;--4、在flink中就可以使用hive中已经创建好的表select*from student;-- 可以从catalog开始定位一张表select*from myhive.`default`.student;-- 将flink的表结构保存到hive catalog中-- hive中可以看到flink创建的流表,但是在hive中不能查询flink的流表createdatabase flink;use flink;-- 创建flink动态表CREATETABLE students_kafka (`offset`BIGINT METADATA VIRTUAL,-- 偏移量`event_time`TIMESTAMP(3) METADATA FROM'timestamp',--数据进入kafka的时间,可以当作事件时间使用
sid STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);
3、hive functions
-- 加载hive函数LOAD MODULE hive WITH('hive-version'='3.1.2');-- 使用hive的函数select split('java,spark',',');
练习
-- 1、实时统计道路拥堵情况,-- 统计最近15分钟,每隔1分钟统计一次,-- 统计车流量和平均车速-- 将统计结果保存到数据库中-- 1、创建kafka source表CREATETABLE cars (
card_id String,-- 车牌号
road_id String,-- 道路编号
city_id String,-- 城市编号
car_id String,-- 卡口编号
com_id String,-- 摄像头编号
fx String,-- 方向
county_id String,-- 区县
ts BIGINT,-- 时间
speed Double,-- 速度
event_time as TO_TIMESTAMP(FROM_UNIXTIME(ts)),-- 将时间戳转换成时间对象-- 指定事件时间和水位线,水位线前移5秒
WATERMARK FOR event_time AS event_time -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='cars',-- 数据的topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- broker 列表'properties.group.id'='testGroup',-- 消费者组'scan.startup.mode'='earliest-offset',-- 读取数据的位置earliest-offset latest-offset'format'='csv'-- 读取数据的格式);-- 创建mysql sink表CREATETABLE road_flow_avg_speed (
road_id STRING,
win_start TIMESTAMP(3),
win_end TIMESTAMP(3),
flow BIGINT,
avg_speed DOUBLE,PRIMARYKEY(road_id,win_start)NOT ENFORCED -- 主键)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/student','table-name'='road_flow_avg_speed','username'='root','password'='123456');-- 在mysql中建表CREATETABLE road_flow_avg_speed (
road_id varchar(10),
win_start DATETIME,
win_end DATETIME,
flow BIGINT,
avg_speed DOUBLE,PRIMARYKEY(road_id,win_start)-- 主键);-- 将查询结果保存到mysqlinsertinto road_flow_avg_speed
select
road_id,
HOP_START(event_time,INTERVAL'1'MINUTE,INTERVAL'15'MINUTE)as win_start,-- 窗口开始时间
HOP_END(event_time,INTERVAL'1'MINUTE,INTERVAL'15'MINUTE)as win_end,-- 窗口结束时间count(1)as flow,avg(speed)as avg_speed
from
cars
groupby
road_id,-- 滑动的事件时间窗口
HOP(event_time,INTERVAL'1'MINUTE,INTERVAL'15'MINUTE);
版权归原作者 晚棠'.ㅅ' 所有, 如有侵权,请联系我们删除。