Flink笔记
一、Flink简介
1、Spark Streaming和Flink的区别
- Spark Streaming- 微批处理(Micro-batching):Spark Streaming 将实时数据流分成小批次(通常每批次几百毫秒到几秒),然后以批处理的方式处理这些数据。每个微批次数据被看作一个 RDD(弹性分布式数据集),由 Spark 的核心引擎处理。- 延迟:由于微批处理的方式,延迟通常较大,通常在秒级。- 优势: 编程原语丰富,编程简单 框架封装层级较高,封装性好 可以共用批处理处理逻辑,兼容性好 基于Spark,可以无缝内嵌Spark其他子项目,如Spark Sql,MLlib等劣势: 微批处理,时间延迟大 稳定性相对较差 机器性能消耗较大
- Flink- 真正的流处理(True Stream Processing):Flink 采用事件驱动模型,逐条处理事件,不分批次。Flink 的处理模型更接近实时,延迟通常在毫秒级。- 低延迟:由于逐条处理数据,Flink 的延迟比 Spark Streaming 更低,适合对延迟要求较高的场景。- 优势: Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力 Flink的很多组件是自行管理的 通过多种方式对工作进行分析进而优化任务 提供了基于Web的调度视图
- 主流实时计算框架对比模型API保证次数容错机制延时吞吐量批流统一业务模式****易用性StormNative组合式At-least-onceRecord ACKs★★★★不支持需要其他框架★Spark StreamingMirco-batching声明式Exectly-onceRDD Checkpoint★★★★支持需要其他框架★★Apache FlinkNative组合式Exectly-onceCheckpoint★★★★★★支持需要其他框架★★
2、什么是Flink
AApache Flink 是一个实时计算框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
这个图展示了一个典型的数据处理架构,强调了 Apache Flink 的应用场景和功能。让我们从左到右分步骤详细解释这个图。
数据输入
图的左侧列出了一些数据来源:
- Transactions(交易)
- Logs(日志)
- IoT(物联网)
- Clicks(点击)
- …(其他数据源)
这些数据源可以生成实时事件(Real-time Events)或存储在数据库、文件系统、KV-Store中。
数据处理框架
中间部分是核心,展示了 Apache Flink 如何处理这些数据:
- Event-driven Applications(事件驱动应用)- 处理实时事件。- Flink 可以处理来自各种实时数据源的事件,以构建实时反应的应用。
- Streaming Pipelines(流处理管道)- 流处理工作流。- Flink 提供了流处理管道,处理连续不断的数据流,支持复杂的事件处理、数据变换和聚合。
- Stream & Batch Analytics(流和批分析)- 同时支持流处理和批处理分析。- Flink 能够在同一个系统中处理流数据和批数据,提供统一的分析平台。
资源与存储
Flink 的处理依赖于各种资源和存储系统:
- Resources(资源):- K8s(Kubernetes)- Yarn- Mesos- 这些资源管理平台用于管理和调度 Flink 集群中的计算资源。
- Storage(存储):- HDFS(Hadoop Distributed File System)- S3(Amazon Simple Storage Service)- NFS(Network File System)- 这些存储系统用于保存 Flink 处理过程中使用和生成的数据。
数据输出
图的右侧展示了 Flink 处理后的数据去向:
- Application(应用):处理后的数据可以直接驱动应用程序。
- Event Log(事件日志):处理后的事件数据可以记录在事件日志中,用于审计或进一步分析。
- Database, File System, KV-Store(数据库、文件系统、键值存储):结果数据可以保存到不同的存储系统中。
数据流动过程
数据从左侧的数据源开始,经过 Flink 的处理框架(事件驱动应用、流处理管道、流和批分析),最终输出到应用、事件日志或存储系统中。在这个过程中,Flink 依赖资源管理平台(如 Kubernetes、Yarn、Mesos)来调度计算资源,并使用各种存储系统(如 HDFS、S3、NFS)来保存数据。
总结
这张图展示了 Flink 在处理实时数据和批数据方面的强大能力。它可以接收多种数据源,通过流处理管道和事件驱动应用实时处理数据,并能同时支持流和批处理分析。处理后的数据可以应用于不同的场景,包括驱动应用程序、记录事件日志和保存到各种存储系统中。Flink 的架构依赖于资源管理平台和分布式存储系统,确保高效、可靠地处理大量数据。
3、Flink的特性
- 支持高吞吐、低延迟、高性能的流处理
- 支持带有事件时间的窗口(Window)操作
- 支持有状态计算的Exactly-once语义
- 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
- 支持具有反压功能的持续流模型
- 支持基于轻量级分布式快照(Snapshot)实现的容错
- 一个运行时同时支持Batch on Streaming处理和Streaming处理
- Flink在JVM内部实现了自己的内存管理,避免了出现oom
- 支持迭代计算
- 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
4、Flink的组件栈
Apache Flink的组件栈:
Deployment层:
Apache Flink可以部署在单JVM,,独立集群,Yarn,K8S等集群中运行。
Runtime层:
针对不同的执行环境,Flink 提供了一套统一的分布式作业执行引擎,也就是 Flink Runtime 这层。
Flink Runtime 层采用了标准 master-slave 的结构,master(JobManager )负责管理整个集群中的资源和作业;Slave(TaskManager)负责提供具体的资源并实际执行作业。
API层&Libraries层:
API层
API蹭其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层API 中处理的数据类型在每种编程语言中都有其对应的类。Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。
Libraries层
Table API是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),
SQL这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API中定义的表上执行。
5、Flink 中的 API
Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。
- Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。
- Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。
- Flink API 第三层抽象是 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用。
- Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。
6、Flink的流批处理
- 流处理的代码举例
package com.shujia.core;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo1StreamWordCount {
public static void main(String[] args) throws Exception {
//加载flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置任务的并行度;一个并行度相当于一个task
env.setParallelism(2);
//设置延迟时间,默认的时间是200毫秒,单位是毫秒
env.setBufferTimeout(100);
//读取数据
DataStream<String> dataStream = env.socketTextStream("master", 12345);
//计算每个单词的个数
DataStream<Tuple2<String, Integer>> wordDS = dataStream.map((word) -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
//通过keyBy进行分组
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordDS.keyBy((kv) -> kv.f0);
//通过sum进行聚合
DataStream<Tuple2<String, Integer>> wordCountDS = keyByDS.sum(1);
//打印数据
wordCountDS.print();
//触发程序执行
env.execute();
}
}
- flink持续流处理的模型
- 批处理的代码
packagecom.shujia.core;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDemo2BatchWordCount{publicstaticvoidmain(String[] args)throwsException{//加载flink环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task
env.setParallelism(2);//设置延迟时间,默认的时间是200毫秒,单位是毫秒
env.setBufferTimeout(100);/*
*处理模式
* RuntimeExecutionMode.BATCH:批处理模式(MapReduce模型)
* 1、输出最终结果
* 2、批处理模式只能用于处理有界流
*
* RuntimeExecutionMode.STREAMING:流处理模式(持续流模型)
* 1、输出连续结果
* 2、流处理模式,有界流核无界流都可以处理
*///设置数据的处理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//读取数据DataStream<String> dataStream = env.readTextFile("flink/data/words.txt");//计算每个单词的个数DataStream<Tuple2<String,Integer>> wordDS = dataStream.map((word)->Tuple2.of(word,1),Types.TUPLE(Types.STRING,Types.INT));//通过keyBy进行分组KeyedStream<Tuple2<String,Integer>,String> keyByDS = wordDS.keyBy((kv)-> kv.f0);//通过sum进行聚合DataStream<Tuple2<String,Integer>> wordCountDS = keyByDS.sum(1);//打印数据
wordCountDS.print();//触发程序执行
env.execute();}}
二、Data Sources
1、分为四类
Flink 在流处理和批处理上的 source 大概有 4 类:
基于本地集合的 source、
基于文件的 source、
基于网络套接字的 source、
自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
2、具体介绍
2.1、基于本地集合的source
fromCollection(Collection)
- 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
//创建flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建List集合
ArrayList<String> wordsList = new ArrayList<>();
//添加集合元素
wordsList.add("java");
wordsList.add("java");
wordsList.add("flink");
wordsList.add("flink");
wordsList.add("hadoop");
wordsList.add("hadoop");
/*
*基于集合的source --- 有界流
*/
//读取数据
DataStream<String> wordDS = env.fromCollection(wordsList);
//打印结果
wordDS.print();
//启动任务
env.execute();
2.2、基于文件的source
readTextFile(path)
- 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。readFile(fileInputFormat, path)
- 按照指定的文件输入格式读取(一次)文件。readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
- 这是前两个方法内部调用的方法。它基于给定的fileInputFormat
读取路径path
上的文件。根据提供的watchType
的不同,source 可能定期(每interval
毫秒)监控路径上的新数据(watchType 为FileProcessingMode.PROCESS_CONTINUOUSLY
),或者处理一次当前路径中的数据然后退出(watchType 为FileProcessingMode.PROCESS_ONCE
)。使用pathFilter
,用户可以进一步排除正在处理的文件。实现:在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于watchType
),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。重要提示:1. 如果watchType
设置为FileProcessingMode.PROCESS_CONTINUOUSLY
,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。2. 如果watchType
设置为FileProcessingMode.PROCESS_ONCE
,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。
//加载flink环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();/*
* 老版本读取数据
*///读取数据DataStream<String> wordDs = env.readTextFile("flink/data/words.txt");//打印数据// wordDs.print();/*
* 新版本读取数据的方式,可以读取有界流和无界流
*///读取有界流// FileSource<String> fileSource = FileSource.forRecordStreamFormat(// new TextLineInputFormat("UTF-8"),// new Path("flink/data/words.txt")// ).build();//// DataStreamSource<String> fileSourceDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSourceDS");//// fileSourceDS.print();//读取无界流FileSource<String> fileSource =FileSource.forRecordStreamFormat(//指定读取数据的编码newTextLineInputFormat("UTF-8"),//指定读取数据的路径newPath("flink/data/stu"))每隔一段时间读取目录下新的文件,构建无界流.monitorContinuously(Duration.ofSeconds(5)).build();DataStream<String> fileSourceDS = env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"fileSourceDS");DataStream<Tuple2<String,Integer>> clazzDS = fileSourceDS.map(lines ->Tuple2.of(lines.split(",")[4],1),Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyByDS = clazzDS.keyBy(kv -> kv.f0);DataStream<Tuple2<String,Integer>> countDS = keyByDS.sum(1);
countDS.print();//启动任务
env.execute();
2.3、基于套接字的source
- 通过 nc -lk post(端口号) 启动一个输入流
socketTextStream
- 从套接字读取。元素可以由分隔符分隔。
//加载flink环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//设置任务的并行度;一个并行度相当于一个task
env.setParallelism(2);//设置延迟时间,默认的时间是200毫秒,单位是毫秒
env.setBufferTimeout(100);//读取数据DataStream<String> dataStream = env.socketTextStream("master",12345);
2.4、基于自定义的source
addSource
- 关联一个新的 source function。
public class Demo3MySource {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取自定义数据
DataStreamSource<Integer> sourceFunctionDS = env.addSource(new MySource());
//打印结果
sourceFunctionDS.print();
//启动任务
env.execute();
}
}
/**
* 自定义source
* 实现SourceFunction接口,实现接口中的run方法
*/
class MySource implements SourceFunction<Integer> {
/**
* flink启动的时候会执行一次,再run方法中读取外部的数据,将数据发送到下游
*/
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (true) {
ctx.collect(100);
Thread.sleep(100);
}
}
//cancel方法再任务取消的时候执行,一般用于回收资源
@Override
public void cancel() {
}
}
可以通过自定义的source连接到MySQL数据库等
publicclassDemo4MySQLSource{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//读取自定义数据DataStreamSource<Student> sourceFunctionDS = env.addSource(newMySQLSource());
sourceFunctionDS.print();
env.execute();}}@Data@AllArgsConstructorclassStudent{privateint id;privateString name;privateint age;privateString gender;privateString clazz;}/**
* 自定义source 读取mysql中的数据
* 练习:自定义source 读取redis中的数据
*/classMySQLSourceimplementsSourceFunction<Student>{//run方法在任务启动的时候执行一次@Overridepublicvoidrun(SourceContext<Student> ctx)throwsException{//加载驱动Class.forName("com.mysql.jdbc.Driver");//创建连接对象Connection conn =DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false","root","123456");//获取操作对象PreparedStatement statement = conn.prepareStatement("select * from students");//执行SQL语句ResultSet resultSet = statement.executeQuery();while(resultSet.next()){int id = resultSet.getInt(1);String name = resultSet.getString(2);int age = resultSet.getInt(3);String gender = resultSet.getString(4);String clazz = resultSet.getString(5);//封装为学生对象Student student =newStudent(id, name, age, gender, clazz);//将数据发送到下游
ctx.collect(student);}//关闭资源
statement.close();
conn.close();}@Overridepublicvoidcancel(){}}
三、DataStream Transformations(算子)
1、什么是算子
- 用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。
2、数据流转换
2.1、Map
- DataStream → DataStream #
- 输入一个元素同时输出一个元素。下面是将输入流中元素数值加倍的 map function:
publicclassDemo1Map{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master",12345);//匿名内部类的方式// SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = linesDS.map(new MapFunction<String, Tuple2<String, Integer>>() {//// @Override// public Tuple2<String, Integer> map(String value) throws Exception {// return Tuple2.of(value, 1);// }// });//声明式写法DataStream<Tuple2<String,Integer>> wordDS = linesDS.map(word ->Tuple2.of(word,1),Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyByDS = wordDS.keyBy(kv -> kv.f0);DataStream<Tuple2<String,Integer>> countDS = keyByDS.sum(1);
countDS.print();
env.execute();}}
2.2、FlatMap
- DataStream → DataStream #
- 输入一个元素同时产生零个、一个或多个元素。下面是将句子拆分为单词的 flatmap function:
publicclassDemo2FlatMap{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master",12345);//匿名内部类方式// SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {//// @Override// public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {// String[] words = line.split(" ");// for (String word : words) {// out.collect(Tuple2.of(word, 1));//// }// }// });//声明式要指定数据类型SingleOutputStreamOperator<Tuple2<String,Integer>>flatMapDS = linesDS.flatMap((line, out)->{String[] words = line.split(" ");for(String word : words){
out.collect(Tuple2.of(word,1));}},Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyByDS = flatMapDS.keyBy(kv -> kv.f0);SingleOutputStreamOperator<Tuple2<String,Integer>> countDS = keyByDS.sum(1);
countDS.print();
env.execute();}}
2.3、Filter
- DataStream → DataStream #
- 为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素。下面是过滤掉零值的 filter:
publicclassDemo3Filter{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.csv");//匿名内部类方式SingleOutputStreamOperator<String> filterDS = studentDS.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String line)throwsException{return"文科一班".equals(line.split(",")[4]);}});//声明式// SingleOutputStreamOperator<String> filterDS = studentDS.filter(line -> "文科一班".equals(line.split(",")[4]));
filterDS.print();
env.execute();}}
2.4、KeyBy
- DataStream → KeyedStream #
- 在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。
- 以下情况,一个类不能作为 key:1. 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。2. 它是任意类的数组。
publicclassDemo4KeyBy{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> linesDS = env.socketTextStream("master",12345);//匿名内部类方式KeyedStream<String,String> keyByDS = linesDS.keyBy(newKeySelector<String,String>(){@OverridepublicStringgetKey(String word)throwsException{return word;}});
keyByDS.print();//lambda表达式//keyBy:将相同的key发送到同一个task中// KeyedStream<String, String> keyByDS = linesDS.keyBy(word -> word);
env.execute();}}
2.5、Reduce
- KeyedStream → DataStream #
- 在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。
publicclassDemo5Reduce{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> streamSource = env.readTextFile("flink/data/words.txt");DataStream<String> flatMapDS = streamSource.flatMap((FlatMapFunction<String,String>)(line, out)->{String[] words = line.split("\\|");for(String word : words){
out.collect(word);}},Types.STRING);;DataStream<Tuple2<String,Integer>> mapDS = flatMapDS.map(word ->Tuple2.of(word,1),Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(kv -> kv.f0);//匿名内部类// DataStream<Tuple2<String, Integer>> reduceDS = keyByDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {// @Override// public Tuple2<String, Integer> reduce(Tuple2<String, Integer> kv1, Tuple2<String, Integer> kv2) throws Exception {// return Tuple2.of(kv1.f0, kv1.f1 + kv2.f1);// }// });//声明式SingleOutputStreamOperator<Tuple2<String,Integer>> reduceDS = keyByDS.reduce((kv1, kv2)->Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));
reduceDS.print();
env.execute();}}
2.6、Window
- KeyedStream → WindowedStream #
- 可以在已经分区的 KeyedStreams 上定义 Window。Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组。请参阅 windows 获取有关 window 的完整说明。
publicclassDemo6Window{publicstaticvoidmain(String[] args)throwsException{//加载环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//读取数据DataStream<String> streamSource = env.socketTextStream("master",12345);//转换kvDataStream<Tuple2<String,Integer>> mapDS = streamSource.map(word ->Tuple2.of(word,1),Types.TUPLE(Types.STRING,Types.INT));//分组KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(kv -> kv.f0);//设置滑动窗口//SlidingProcessingTimeWindows:滑动的处理时间窗口WindowedStream<Tuple2<String,Integer>,String,TimeWindow> windowDS = keyByDS.window(//传入窗口的大小以及滑动的距离SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(5)));//计算每个单词的个数DataStream<Tuple2<String,Integer>> countDS = windowDS.sum(1);//打印结果
countDS.print();//启动任务
env.execute();}}
2.7、Union
- DataStream → DataStream #*
- 将两个或多个数据流联合来创建一个包含所有流中数据的新流。注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。
public class Demo7Union {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.socketTextStream("master", 12345);
DataStreamSource<String> streamSource1 = env.socketTextStream("master", 8888);
/*
* union:合并两个DS
* 在数据层面并没有合并,只是在逻辑层面合并了
*/
DataStream<String> unionDS = streamSource.union(streamSource1);
unionDS.print();
env.execute();
}
}
2.8、ProcessFunction
ProcessFunction
是一种低级流处理操作,可以访问所有(非循环)流应用程序的基本构建块
public class Demo8ProcessFunction {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.socketTextStream("master", 12345);
/*
* process算子是flink的底层算子吗,可以代替map flatMap filter算子
*/
SingleOutputStreamOperator<Tuple2<String, Integer>> processDS = streamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
/**
* processElement:相当于flatMap。每一条数据执行一次,可以返回一条也可以返回多条
* line:一行数据
* ctx:上下文对象(代表flink的执行环境)
* out: 用于将数据发送到下游
*/
@Override
public void processElement(String line, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
out.collect(tuple2);
}
}
});
processDS.print();
env.execute();
}
}
四、Data Sinks
1、概述
- 在 Apache Flink 中,数据接收器(Data Sinks)用于将处理后的数据发送到外部系统
2、分为四大类
- 写入文件、 打印出来、 写入 socket 、 自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。
3、具体介绍
- 主要介绍常用的两类
3.1、FileSink
publicclassDemo1FileSink{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> wordDs = env.socketTextStream("master",12345);FileSink<String> fileSink =FileSink.<String>forRowFormat(newPath("flink/data/words"),newSimpleStringEncoder<>("UTF-8"))//指定滚动策略.withRollingPolicy(DefaultRollingPolicy.builder()//包含了至少10秒的数据量.withRolloverInterval(Duration.ofSeconds(10))//从没接收延时10秒之外的新纪录.withInactivityInterval(Duration.ofSeconds(10))//文件大小已经达到 1MB(写入最后一条记录之后).withMaxPartSize(MemorySize.ofMebiBytes(1)).build()).build();
wordDs.sinkTo(fileSink);
env.execute();}}
3.2、自定义Sink
publicclassDemo2MySink{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> wordsDs = env.socketTextStream("master",12345);
wordsDs.addSink(newMySink());
env.execute();}}//自定义sinkclassMySinkimplementsSinkFunction<String>{@Overridepublicvoidinvoke(String value,Context context)throwsException{//自定义数据sink的位置System.out.println("自定义sink:"+ value);}}
- 通过自定义Sink将数据保存到MySQL数据库中
public class Demo3MySQLSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentDS = env.addSource(new MySQLSource());
DataStream<Tuple2<String, Integer>> genderDS = studentDS.map(stu -> Tuple2.of(stu.getGender(), 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = genderDS.keyBy(kv -> kv.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = keyByDS.sum(1);
sumDS.addSink(new MySQLSink());
env.execute();
}
}
@Data
@AllArgsConstructor
class Student {
private int id;
private String name;
private int age;
private String gender;
private String clazz;
}
/**
* 自定义source 读取mysql中的数据
*
*/
class MySQLSource implements SourceFunction<Student> {
//run方法在任务启动的时候执行一次
@Override
public void run(SourceContext<Student> ctx) throws Exception {
//加载驱动
Class.forName("com.mysql.jdbc.Driver");
//创建连接对象
Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false&characterEncoding=UTF-8", "root", "123456");
//获取操作对象
PreparedStatement statement = conn.prepareStatement("select * from students");
//执行SQL语句
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
int id = resultSet.getInt(1);
String name = resultSet.getString(2);
int age = resultSet.getInt(3);
String gender = resultSet.getString(4);
String clazz = resultSet.getString(5);
//封装为学生对象
Student student = new Student(id, name, age, gender, clazz);
//将数据发送到下游
ctx.collect(student);
}
//关闭资源
statement.close();
conn.close();
}
@Override
public void cancel() {
}
}
class MySQLSink extends RichSinkFunction<Tuple2<String, Integer>> {
private Connection conn;
private PreparedStatement statement;
@Override
public void open(Configuration parameters) throws Exception {
//加载驱动
Class.forName("com.mysql.jdbc.Driver");
//创建连接对象
conn = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false&characterEncoding=UTF-8", "root", "123456");
//获取操作对象
statement = conn.prepareStatement("replace into gender_count values(?,?)");
}
@Override
public void close() throws Exception {
statement.close();
conn.close();
}
@Override
public void invoke(Tuple2<String, Integer> kv, Context context) throws Exception {
statement.setString(1, kv.f0);
statement.setInt(2, kv.f1);
statement.execute();
}
}
五、Flink的集群搭建
1、Standalone(独立集群)
1.1、上传解压配置环境变量
# 解压tar-xvf flink-1.15.2-bin-scala_2.12.tgz
# 配置环境变量vim /etc/profile
exportFLINK_HOME=/usr/local/soft/flink-1.15.2
exportPATH=$PATH:$FLINK_HOME/bin
source /etc/profile
1.2、修改配置文件
- flink-conf.yaml
jobmanager.rpc.address: master
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
taskmanager.host: localhost # noe1和node2需要单独修改
taskmanager.numberOfTaskSlots: 4
rest.address: master
rest.bind-address: 0.0.0.0
- masters
master:8081
- workers
node1
node2
1.3、同步到所有节点
scp-r flink-1.15.2 node1:`pwd`scp-r flink-1.15.2 node2:`pwd`# 修改node1和node2中地taskmanager.host
taskmanager.host: node1
taskmanager.host: node2
1.4、启动Flink独立集群
start-cluster.sh
# stop-cluster.sh# flink web ui
http://master:8081
1.5、提交任务
- 将代码打包上传到服务器提交
flink run -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar
- 在flink web ui中直接提交
2、Flink on yarn
flink on yarn模式:将flink地任务提交到yarn上运行
2.1、整合到yarn
# 在环境变量中配置HADOOP_CLASSSPATHvim /etc/profile
exportHADOOP_CLASSPATH=`hadoop classpath`source /etc/profile
2.2、Flink on yarn部署模式
部署模式总共有三种
- Application Mode
1、将任务提交到yarn上运行,yarn会为每一个flink地任务启动一个jobmanager和一个或者多个taskmanasger
2、代码main函数不再本地运行,dataFlow不再本地构建,如果代码报错在本地看不到详细地错误日志
flink run-application -t yarn-application -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar
# 查看yarn的日志yarn logs -applicationId application_1717039073374_0001
- Per-Job Cluster Mode
1、将任务提交到yarn上运行,yarn会为每一个flink地任务启动一个jobmanager和一个或者多个taskmanasger
2、代码地main函数在本地启动,在本地构建dataflow,再将dataflow提交给jobmanager,如果代码报错再本地可以烂到部分错误日志
flink run -t yarn-per-job -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar
- Session Mode
1、先再yarn中启动一个jobmanager, 不启动taskmanager
2、提交任务地时候再动态申请taskmanager
3、所有使用session模式提交的任务共享同一个jobmanager
4、类似独立集群,只是集群在yarn中启动了,可以动态申请资源
5、一般用于测试
# 1、先启动会话集群
yarn-session.sh -d# 2、在提交任务
flink run -t yarn-session -Dyarn.application.id=application_1717068381486_0003 -c com.shujia.core.Demo1StreamWordCount flink-1.0.jar
# 在网页中直接提交
六、Flink的架构及其工作原理
1、解析Flink集群
Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。
- Flink Program: - Program Code:这是用户编写的Flink程序,包含数据处理的逻辑。- Optimizer / Graph Builder:当用户提交Flink程序时,首先由客户端(Client)解析程序并构建一个数据流图(Dataflow Graph)。
- Client: - Actor System:Flink的Client通过Actor System与JobManager进行通信。- Dataflow Graph:客户端生成的数据流图(Dataflow Graph)会被提交给JobManager。
- JobManager: - Actor System:JobManager也通过Actor System与Client和TaskManager进行通信。- Dataflow Graph:JobManager接收到数据流图后,负责将其转换为实际的执行图。- Scheduler:调度器负责分配任务,并决定在集群中的哪个TaskManager上运行哪些任务。- Checkpoint Coordinator:负责协调检查点操作,以确保在出现故障时能够从最近的检查点恢复。
- TaskManager: - Task Slots:每个TaskManager拥有多个任务槽(Task Slot),每个任务槽可以独立执行一个任务(Task)。- Memory & I/O Manager:管理内存和I/O操作,以确保高效的数据处理。- Network Manager:管理网络通信,确保TaskManager之间的数据流传输顺畅。- Actor System:与JobManager和其他TaskManager进行通信。
2、TaskSlot和并行度
2.1、TaskSlot
Slot 是指 TaskManager 最大能并发执行的能力
Task Slot 是 Flink 中用于执行并行任务的物理资源单元。它们可以被视为一个 TaskManager(任务管理器)上的资源分配单位。
每个 Task Slot 可以运行一个并行任务,但也可以配置多个 Task Slot 来支持更高的并行度。
Task Slot 通常与计算资源相关联,比如一个 CPU 核心、一段内存等。
2.2、Parallelism(并行度)
- parallelism 是指 TaskManager 实际使用的并发能力
Parallelism 是指在同一时间处理数据的任务数量。在 Flink 中,它可以应用于整个作业(全局并行度)或单个算子(局部并行度)。
Flink 作业的并行度可以在作业提交时指定,也可以在算子级别进行设置。通过设置并行度,可以控制作业的性能和资源利用率。
2.3、设置并行度的方式
- 在代码中设置:env.setParallelism(2)
- 在提交任务是通过参数设置 -p (推荐使用)
- 在配置文件中统一设置
- 每一个算子可以单独设置并行度
算子设置并行度 > env 设置并行度 > 配置文件默认并行度。
2.4、Flink的共享资源
1、flink需要资源的数量和task数量无关
2、一个并行度对应一个资源(slot)
3、上游task的下游task共享同一个资源
2.5、并行度设置的原则
1、实时计算的任务并行度取决于数据的吞吐量
2、聚合计算(有shuffle)的代码一个并行度大概一秒可以处理10000条数据左右
3、非聚合计算是,一个并行度大概一秒可以处理10万条左右
3、Event Time(事件时间) and Processing Time (处理时间)
3.1、Processing Time
处理时间是指执行相应操作的机器的系统时间。
publicclassDemo4ProcTime{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> wordsDS = env.socketTextStream("master",8888);//转换成kvDataStream<Tuple2<String,Integer>> kvDS = wordsDS
.map(word ->Tuple2.of(word,1),Types.TUPLE(Types.STRING,Types.INT));//按照单词分组KeyedStream<Tuple2<String,Integer>,String> keyByDS = kvDS.keyBy(kv -> kv.f0);//划分窗口//TumblingProcessingTimeWindows:股东的处理时间窗口WindowedStream<Tuple2<String,Integer>,String,TimeWindow> windowDS = keyByDS
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));//统计单词的数量DataStream<Tuple2<String,Integer>> countDS = windowDS.sum(1);
countDS.print();
env.execute();}}
3.2、Event Time
事件时间是每个单独事件在其产生设备上发生的时间。这个时间通常在记录进入Flink之前嵌入到记录中,并且可以从每个记录中提取事件时间戳。在事件时间中,时间的进展取决于数据,而不是任何操作机器的系统时间(例如:window系统时间)。事件时间程序必须指定如何生成事件时间水位线,这是在事件时间中表示进程的机制。
3.3、水位线
水位线等于最新一条数据的时间戳,在Flink中测量事件时间进展的机制是水位线。水位线作为数据流的一部分,并携带时间戳t。水位线(t)声明事件时间在该流中已经达到时间t,这意味着流中不应该再有时间戳t <= t的元素(即时间戳比水位线更早或等于水位线的事件)。这种情况只对于数据流是按照事件中的时间戳顺序排列的
但是,数据流可能是无序的,这会导致watermark 一旦越过窗口结束的 timestamp,小于watermark 的事件时间不能触发窗口,从而导致数据丢失,可以设置一些参数让水位线延迟,使小于水位线的数据可以再次触发窗口
- 水位线的对齐
1、当上游有多个task时,下游task会取上游task水位线的最小值,如果数据量小。水位线就很难对齐,窗口就不会触发计算
public class Demo5EventTime {
public static void main(String[] args) throws Exception {
/*
* 事件时间:数据中有一个时间字段,使用数据的时间字段触发计算,代替真实的时间,可以反应数据真实发生的顺序,计算更有意义
*/
/*
java,1685433130000
java,1685433131000
java,1685433132000
java,1685433134000
java,1685433135000
java,1685433137000
java,1685433139000
java,1685433140000
java,1685433170000
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
*水位线对齐
* 1、当上游有多个task时,下游task会取上游task水位线的最小值,如果数据量小。水位线就很难对齐,窗口就不会触发计算
*/
env.setParallelism(1);
DataStream<String> linesDS = env.socketTextStream("master", 8888);
//解析数据
DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {
String[] split = line.split(",");
String word = split[0];
long ts = Long.parseLong(split[1]);
return Tuple2.of(word, ts);
}, Types.TUPLE(Types.STRING, Types.LONG));
/*
* 指定时间字段和水位线生成策略
*/
DataStream<Tuple2<String, Long>> assDS = tsDS
.assignTimestampsAndWatermarks(
WatermarkStrategy
//指定水位线生产策略,水位线等于最新一条数据的时间戳,如果数据乱序可能会丢失数据
//.<Tuple2<String, Long>>forMonotonousTimestamps()
//水位线前移时间(数据最大乱序时间)
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//指定时间字段
.withTimestampAssigner((event, ts) -> event.f1)
);
/*
*每隔5秒统计单词的数量
*/
DataStream<Tuple2<String, Integer>> kvDS = assDS
.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
.keyBy(kv -> kv.f0);
//TumblingEventTimeWindows:滚动的事件时间窗口
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
windowDS.sum(1).print();
env.execute();
}
}
4、窗口
窗口总共分为三大类:
1、Time Window
2、Count Window
3、Session Window
4.1、Time Window
时间窗口分为四种:
1、滚动处理时间窗口(TumblingProcessingTimeWindows)
2、滚动事件时间窗口(TumblingEventTimeWindows)
3、滑动处理时间窗口(SlidingProcessingTimeWindows)
4、滑动事件时间窗口(SlidingEventTimeWindows)
- 滚动处理时间窗口(TumblingProcessingTimeWindows)
滚动窗口的大小是固定的,且各自范围之间不重叠。时间是根据系统时间来计算的
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.socketTextStream("master", 12345);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = streamSource.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordDS.keyBy(kv -> kv.f0);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);
countDS.print();
env.execute();
}
- 滚动事件时间窗口(TumblingEventTimeWindows)
滚动窗口的大小是固定的,且各自范围之间不重叠。时间是根据所带的时间字段来计算的
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
*水位线对齐
* 1、当上游有多个task时,下游task会取上游task水位线的最小值,如果数据量小。水位线就很难对齐,窗口就不会触发计算
*因此并行度设为1
*/
env.setParallelism(1);
DataStreamSource<String> streamSource = env.socketTextStream("master", 12345);
SingleOutputStreamOperator<Tuple2<String, Long>> wordDS = streamSource.map(lines -> {
String[] line = lines.split(",");
String word = line[0];
long ts = Long.parseLong(line[1]);
return Tuple2.of(word, ts);
}, Types.TUPLE(Types.STRING, Types.LONG));
SingleOutputStreamOperator<Tuple2<String, Long>> eventDS = wordDS.assignTimestampsAndWatermarks(WatermarkStrategy
//设置水位线延迟时间
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//不设置水位线的延迟
//.<Tuple2<String, Long>>forMonotonousTimestamps()
//指定时间字段
.withTimestampAssigner((event, ts) -> event.f1)
);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = eventDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = mapDS.keyBy(kv -> kv.f0);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);
countDS.print();
env.execute();
}
- 滑动处理时间窗口(SlidingProcessingTimeWindows)
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率,时间是根据系统时间来计算的。
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> streamSource = env.socketTextStream("master",12345);SingleOutputStreamOperator<Tuple2<String,Integer>> lineDS = streamSource.map(line ->Tuple2.of(line,1),Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyByDS = lineDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String,Integer>,String,TimeWindow> windowDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(5)));SingleOutputStreamOperator<Tuple2<String,Integer>> countDS = windowDS.sum(1);
countDS.print();
env.execute();}
- 滑动事件时间窗口(SlidingEventTimeWindows)
与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。 滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率,时间是根据所带的时间字段来计算的。
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> streamSource = env.socketTextStream("master", 12345);
SingleOutputStreamOperator<Tuple2<String, Long>> wordDS = streamSource.map(line -> {
String[] words = line.split(",");
String word = words[0];
long ts = Long.parseLong(words[1]);
return Tuple2.of(word, ts);
}, Types.TUPLE(Types.STRING, Types.LONG));
SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = wordDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((event, ts) -> event.f1)
);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = watermarks.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = mapDS.keyBy(kv -> kv.f0);
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);
countDS.print();
env.execute();
}
4.2、Count Window
计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口
下面的代码是基于同一个单词达到3个就会计算
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> streamSource = env.socketTextStream("master",12345);SingleOutputStreamOperator<Tuple2<String,Integer>> lineDS = streamSource.map(line ->Tuple2.of(line,1),Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyByDS = lineDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String,Integer>,String,GlobalWindow> countWindow = keyByDS.countWindow(3);SingleOutputStreamOperator<Tuple2<String,Integer>> countDS = countWindow.sum(1);
countDS.print();
env.execute();}
4.3、Session Window
会话窗口的 assigner 会把数据按活跃的会话分组。 与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。 会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。 会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。 当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> streamSource = env.socketTextStream("master",12345);SingleOutputStreamOperator<Tuple2<String,Integer>> lineDS = streamSource.map(line ->Tuple2.of(line,1),Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyByDS = lineDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String,Integer>,String,TimeWindow> windowDS = keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));SingleOutputStreamOperator<Tuple2<String,Integer>> countDS = windowDS.sum(1);
countDS.print();
env.execute();}
七、Kafka(消息队列)
1、概述
Kafka是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量。
2、Kafka性能好的原因
1、kafka写磁盘是顺序的,所以不断的往前产生,不断的往后写
2、kafka还用了sendFile的0拷贝技术,提高速度
3、而且还用到了批量读写,一批批往里写,64K为单位
- 非零拷贝
数据在内核空间和用户空间之间需要多次拷贝,增加了 CPU 和内存带宽的使用,性能较低。
- 非零拷贝
通过减少数据拷贝次数,降低了 CPU 和内存带宽的使用,提高了数据传输性能。常见的实现技术包括
sendfile()
、
mmap()
和 DMA。
3、Kafka的架构
- Broker
Kafka集群中包含的服务器,有一个或多个服务器,这种服务器被称为 Broker。
Broker 端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快。避免了在JVM 内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。
- Producer(生产者)
负责发布消息到Kafka Broker
producer自己决定往哪个partition写消息,可以是轮询的负载均衡,或者是基于hash的partition策略
- Consumer(消费者)
负责从Broker 拉取(pull)数据并进行处理。
- Topic
每条发布到kafka集群的消息都有一个类别,这个类别被称为Topic
物理上不同Topic的消息分开存储,逻辑上一个Topic 的消息虽然保存于一个或多个Broker上但是用户只需指定消费的Topic即课生产或消费数据而不必关心数据存于何处。一个topic分成多个partition
- Partition
Partition 是物理上的概念,每个Topic 包含一个或多个Partition。kafka分配的单位是Partition
每个partition内部消息强有序,其中的每个消息都有一个序号叫offset
一个partition只对应一个broker,一个broker可以管多个partition
- Consumer Group
每个Consumer 属于一个特定的Consumer Group
可为每个Consumer 指定Group name,若不指定group name 则属于默认的group
每条消息只可以被Consumer Goup 组中中的一个Consumer消费,但是可以指定多个Consumer Group
所以一个消息在Consumer Group 里面只可以被消费一次。已确定!
4、Kafka的Java API
4.1、Produce
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","master:9092,node2:9092,node2:9092");
//指定key和value的数据格式
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//生产数据
kafkaProducer.send(new ProducerRecord<>("words","java"));
//刷新数据
kafkaProducer.flush();
//关闭资源
kafkaProducer.close();
}
4.2、Consumer
public static void main(String[] args) {
Properties properties = new Properties();
//kafka 集群列表
properties.setProperty("bootstrap.servers", "master:9092,node2:9092,node2:9092");
//读取数据的格式
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/*
* earliest
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
* latest 默认
* 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产认值生的该分区下的数据
* none
* topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
*
*/
properties.setProperty("auto.offset.reset", "earliest");
//指定消费者组,一条数据在一个组内只消费一次
properties.setProperty("group.id", "asddffas");
//创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//订阅生产者
ArrayList<String> list = new ArrayList<>();
list.add("student_hash");
kafkaConsumer.subscribe(list);
while (true){
//拉去数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
//获取数据的信息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
String topic = consumerRecord.topic();
long offset = consumerRecord.offset();
int partition = consumerRecord.partition();
String value = consumerRecord.value();
long timestamp = consumerRecord.timestamp();
System.out.println(topic + "\t" + offset + "\t" + partition + "\t" + value + "\t" + timestamp);
}
}
}
5、Flink on Kafka
5.1、KafkaSource
- 起始消费位点> Kafka source 能够通过位点初始化器(>
> OffsetsInitializer>
> )来指定从不同的偏移量开始消费 。内置的位点初始化器包括:
KafkaSource.builder()// 从消费组提交的位点开始消费,不指定位点重置策略.setStartingOffsets(OffsetsInitializer.committedOffsets())// 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 从时间戳大于等于指定时间戳(毫秒)的数据开始消费.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))// 从最早位点开始消费.setStartingOffsets(OffsetsInitializer.earliest())// 从最末尾位点开始消费.setStartingOffsets(OffsetsInitializer.latest());
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
//kafka集群
.setBootstrapServers("master:9092,node1:9092,node2:9092")
//指定topic
.setTopics("students")
//指定消费者组
.setGroupId("student-group")
//指定读取的数据的起始点
.setStartingOffsets(OffsetsInitializer.earliest())
//指定读取数据的格式
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafkaSource");
kafkaSourceDS.print();
env.execute();
}
5.2、KafkaSink
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> scoreDS = env.readTextFile("flink/data/score.txt");
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("master:9092,node1:9092,node2:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("score")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
scoreDS.sinkTo(kafkaSink);
env.execute();
}
八、状态与容错
1、Flink的状态
虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器),但有些操作记住跨多个事件的信息(例如窗口操作符)。这些操作称为有状态操作。
1.1、Keyed State
keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在
KeyedStream
上使用,在Java/Scala API上可以通过
stream.keyBy(...)
得到
KeyedStream
,在Python API上可以通过
stream.key_by(...)
得到
KeyedStream
。
接下来,我们会介绍不同类型的状态,然后介绍如何使用他们。所有支持的状态类型如下所示:
ValueState<T>
: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过update(T)
进行更新,通过T value()
进行检索。ListState<T>
: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)
或者addAll(List<T>)
进行添加元素,通过Iterable<T> get()
获得整个列表。还可以通过update(List<T>)
覆盖当前的列表。ReducingState<T>
: 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState
类似,但使用add(T)
增加元素,会使用提供的ReduceFunction
进行聚合。AggregatingState<IN, OUT>
: 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState
相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与ListState
类似,但使用add(IN)
添加的元素会用指定的AggregateFunction
进行聚合。MapState<UK, UV>
: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)
或者putAll(Map<UK,UV>)
添加映射。 使用get(UK)
检索特定 key。 使用entries()
,keys()
和values()
分别检索映射、键和值的可迭代视图。你还可以通过isEmpty()
来判断是否包含任何键值对。
所有类型的状态还有一个
clear()
方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
- 传统求wordcount的代码以及出现的问题
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> lineDS = env.socketTextStream("master",12345);KeyedStream<String,String> keyByDS = lineDS.keyBy(word -> word);DataStream<Tuple2<String,Integer>> countDS = keyByDS.process(newProcessFunction<String,Tuple2<String,Integer>>(){//保存之前统计的结果(状态)//问题:同一个task中的数据共享同一个count变量//int count = 0;//问题:如果关闭这个程序或者系统坏了数据就会丢失,使用hashmap保存计算的中间结果,flink的checkpoint不会将hashmap中的数据持久化到hdfs上finalHashMap<String,Integer> hashMap =newHashMap<>();/**
* processElement方法每一条数据执行一次
* @param word 一行数据
* @param ctx 上下文对象,可以获取到flink的key和时间属性
* @param out 用于将处理结果发送到下游
*/@OverridepublicvoidprocessElement(String word,ProcessFunction<String,Tuple2<String,Integer>>.Context ctx,Collector<Tuple2<String,Integer>> out)throwsException{Integer count = hashMap.getOrDefault(word,0);
count++;
out.collect(Tuple2.of(word, count));
hashMap.put(word, count);}});
countDS.print();
env.execute();}
- 修改后的状态代码
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lineDS = env.socketTextStream("master", 12345);
KeyedStream<String, String> keyByDS = lineDS.keyBy(word -> word);
SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = keyByDS.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
RuntimeContext context = getRuntimeContext();
ValueStateDescriptor<Integer> valueStateDescriptor = new ValueStateDescriptor<>("count", Types.INT, 0);
state = context.getState(valueStateDescriptor);
}
@Override
public void processElement(String word, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer count = state.value();
count++;
out.collect(Tuple2.of(word, count));
state.update(count);
}
});
countDS.print();
env.execute();
}
- 基于 DataStream API 实现欺诈检测(官网案例关于状态的)
我们先实现第一版报警程序,对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> streamSource = env.socketTextStream("master",12345);SingleOutputStreamOperator<User> userDS = streamSource.map(line ->{String[] split = line.split(",");String id = split[0];double price =Double.parseDouble(split[1]);returnnewUser(id, price);});KeyedStream<User,String> keyByDS = userDS.keyBy(User::getId);SingleOutputStreamOperator<String> process = keyByDS.process(newProcessFunction<User,String>(){ValueState<Boolean> state;@Overridepublicvoidopen(Configuration parameters)throwsException{RuntimeContext context =getRuntimeContext();ValueStateDescriptor<Boolean> flag =newValueStateDescriptor<>("flag",Types.BOOLEAN,false);
state = context.getState(flag);}@OverridepublicvoidprocessElement(User value,ProcessFunction<User,String>.Context ctx,Collector<String> out)throwsException{if(state.value()){if(value.getPrice()>500){System.out.println("报警");}
state.update(false);}if(value.getPrice()<1){
state.update(true);}}});
env.execute();}@AllArgsConstructor@DataclassUser{privateString id;privateDouble price;}
1.2、算子状态
算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态。Kafka Connector 是 Flink 中使用算子状态一个很具有启发性的例子。Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。
当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。
在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。
1.3、广播状态
广播状态是一种特殊的算子状态。引入它的目的在于支持一个流中的元素需要广播到所有下游任务的使用情形。在这些任务中广播状态用于保持所有子任务状态相同。 该状态接下来可在第二个处理记录的数据流中访问。可以设想包含了一系列用于处理其他流中元素规则的低吞吐量数据流,这个例子自然而然地运用了广播状态。 考虑到上述这类使用情形,广播状态和其他算子状态的不同之处在于:
- 它具有 map 格式,
- 它仅在一些特殊的算子中可用。这些算子的输入为一个广播数据流和非广播数据流,
- 这类算子可以拥有不同命名的多个广播状态 。
2、Checkpoint
可以定时将flink计算的状态持久化到hdfs中,如果任务执行失败,可以基于hdfs中保存到的状态恢复任务,保证之前的结果不丢失
Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:
- 一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
- 存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
2.1、开启Checkpoint的方式
- 在代码中单独开启
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(20000);
// 高级选项:
// 设置模式为精确一次 (这是默认值)
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new HashMapStateBackend());
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs://master:9000/flink/checkpoint");
DataStream<String> streamSource = env.socketTextStream("master", 12345);
DataStream<Tuple2<String, Integer>> wordDS = streamSource.map(line -> Tuple2.of(line, 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordDS.keyBy(kv -> kv.f0);
DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);
countDS.print();
env.execute();
// flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717413381741_0001 -c com.shujia.state.Demo2Checkpoint flink-1.0.jar
}
- 在配置文件中统一开启vim flink-conf.yaml
execution.checkpointing.interval: 5000execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATIONexecution.checkpointing.max-concurrent-checkpoints: 1execution.checkpointing.min-pause: 0execution.checkpointing.mode: EXACTLY_ONCEexecution.checkpointing.timeout: 10minexecution.checkpointing.tolerable-failed-checkpoints: 0execution.checkpointing.unaligned: falsestate.backend: hashmapstate.checkpoints.dir: hdfs://master:9000/flink/checkpoint
2.2、使用checkpoint
- 第一次提交任务之间提交
flink run -t yarn-session -p3-Dyarn.application.id=application_1717413381741_0001 -c com.shujia.state.Demo2Checkpoint flink-1.0.jar
- 重启任务时基于hdfs中的快照重启
# -s 指定恢复任务的位置
flink run -t yarn-session -p3-Dyarn.application.id=application_1717413381741_0001 -c com.shujia.state.Demo2Checkpoint -s hdfs://master:9000/flink/checkpoint/303212ccf121165b0d12713d61608dc3/chk-14 flink-1.0.jar
3、Exactly Once
数据只处理一次
3.1、Kafka的Exactly Once
Kafka保证数据处理的唯一一次
1、幂等性:保证数据不重复
2、事务:保证数据不重复
3、Acks+副本:保证数据不丢失
acks机制:
acks=1(默认):当主分区写入成功,就会返回成功,如果这时主分区挂了,刚写入的数据就会丢失
acks=0:生产者只负责生产数据,不负责验证数据是否成功写入,可能会造成数据丢失,但是写入的性能好
acks=-1或者all:生产者生产数据后必须等待数据都同步到副本之后才会返回成功,但是性能差
3.2、Flink的Exactly Once
Flink 分布式快照保存数据计算的状态(checkpoint)和消费的偏移量,保证程序重启之后不丢失状态和消费偏移量
3.3、Exactly Once的代码
- 聚合运算
如果任务在执行过程中失败了,可以恢复到上一次成功的checkpoint的位置,保证计算的状态和消费偏移量不丢失,保证数据处理的唯一一次
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,node1:9092,node2:9092")
.setTopics("words")
.setGroupId("group-id")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");
DataStream<Tuple2<String, Integer>> wordDS = streamSource.map(line -> Tuple2.of(line, 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordDS.keyBy(kv -> kv.f0);
DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);
countDS.print();
env.execute();
// flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717499706759_0001 -c com.shujia.state.Demo5ExactlyOnce flink-1.0.jar
}
- 非聚合运算
只利用checkpoint和偏移量不能保证数据处理是唯一一次,需要将两次的checkpoint放到一个事务中,上一次checkpoint完成时开启事务,下一次事务完成时提交事务,这样做会增加数据处理的延迟,但是保证了数据处理的唯一一次
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource =KafkaSource.<String>builder().setBootstrapServers("master:9092,node1:9092,node2:9092").setTopics("words").setGroupId("group-id").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(newSimpleStringSchema()).build();DataStream<String> streamSource = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafkaSource");DataStream<String> filterDS = streamSource.filter(word ->!"".equals(word));//flink处理事务的时间要小于Kafka中的事务时间15minProperties properties =newProperties();
properties.setProperty("transaction.timeout.ms",10*60*1000+"");KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("master:9092,node1:9092,node2:9092")//同步flink处理事务的时间.setKafkaProducerConfig(properties).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("filter").setValueSerializationSchema(newSimpleStringSchema()).build())//指定数据处理的语义.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).build();
filterDS.sinkTo(kafkaSink);
env.execute();}
九、FlinkSql
1、DataStream 上的关系查询
下表比较了传统的关系代数和流处理与输入数据、执行和输出结果的关系。
关系代数 / SQL流处理关系(或表)是有界(多)元组集合。流是一个无限元组序列。对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。流式查询在启动时不能访问所有数据,必须“等待”数据流入。批处理查询在产生固定大小的结果后终止。流查询不断地根据接收到的记录更新其结果,并且始终不会结束。
2、动态表 & 连续查询
动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。本质上,动态表上的连续查询非常类似于定义物化视图的查询。
需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。
下图显示了流、动态表和连续查询之间的关系:
- 将流转换为动态表。
- 在动态表上计算一个连续查询,生成一个新的动态表。
- 生成的动态表被转换回流。
注意: 动态表首先是一个逻辑概念。在查询执行期间不一定(完全)物化动态表。
3、表到流的转换
- Append-only 流: 仅通过
INSERT
操作修改的动态表可以通过输出插入的行转换为流。 - Retract 流: retract 流包含两种类型的 message: add messages 和 retract messages 。通过将
INSERT
操作编码为 add message、将DELETE
操作编码为 retract message、将UPDATE
操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。
- Upsert 流: upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将
INSERT
和UPDATE
操作编码为 upsert message,将DELETE
操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于UPDATE
操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。
4、SQL命令行
# 启动flink集群
yarn-seesion.sh -d
# 进入sql命令行sql-client.sh
# 1、创建表,数据源时kafkaCREATETABLE students (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);# 2、编写sql进行连续查询select
clazz,count(1)as num
from students
groupby clazz;# 3、生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students
5、SQL命令行打印结果模式
5.1表格模式(table mode)默认
在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET 'sql-client.execution.result-mode' = 'table';
5.2、变更日志模式(changelog mode)
不会实体化和可视化结果,而是由插入(
+
)和撤销(
-
)组成的持续查询产生结果流。
SET 'sql-client.execution.result-mode' = 'changelog';
5.3、Tableau模式(tableau mode)
更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(
execution.type
):
SET 'sql-client.execution.result-mode' = 'tableau';
6、处理模式
6.1、流处理
1、可以用于处理有界流和无界流
2、流处理模式输出连续结果
3、流处理模式底层时持续流模型
SET 'execution.runtime-mode' = 'streaming';
6.2、批处理
1、批处理模式只能用于处理有界流
2、输出最终结果
3、底层是MapReduce模型
SET'execution.runtime-mode'='batch';
7、连接器
7.1、Kafka
- KafkaSource
-- 创建表 --- 无界流
-- TIMESTAMP(3): 时flink总的时间字段
CREATE TABLE students_kafka (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING,
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',-- 获取kfka时间戳
`partition` BIGINT METADATA VIRTUAL, -- 获取kafka数据所在的分区
`offset` BIGINT METADATA VIRTUAL,-- 偏移量
-- 指定时间字段和水位线生成策略
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'students',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.ignore-parse-errors' ='true' -- 当有脏数据时是否跳过当前行
);
select id,name,event_time,`partition`,`offset` from students_kafka;
-- 每隔5秒统计每个班级的人数
select
clazz,
TUMBLE_START(event_time,INTERVAL '5' SECOND) as win_start,
TUMBLE_END(event_time,INTERVAL '5' SECOND) as win_end,
count(id) as num
from
students_kafka
group by
clazz,
-- 滚动的事件时间窗口
TUMBLE(event_time,INTERVAL '5' SECOND);
- kafka sink
CREATE TABLE students_kafka_num (
clazz STRING,
win_start TIMESTAMP(3),
win_end TIMESTAMP(3),
num BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'clazz_num',
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.ignore-parse-errors' ='true' -- 当有脏数据时是否跳过当前行
);
insert into students_kafka_num
select
clazz,
TUMBLE_START(event_time,INTERVAL '5' SECOND) as win_start,
TUMBLE_END(event_time,INTERVAL '5' SECOND) as win_end,
count(id) as num
from
students_kafka
group by
clazz,
-- 滚动的事件时间窗口
TUMBLE(event_time,INTERVAL '5' SECOND);
7.2、JDBC
- 整合
# 将依赖包上传到flink的lib目录下
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.47.jar
# 依赖更新后需要重启集群才会生效yarn application -listyarn application -kill[appid]
yarn-session.sh -d
sql-client.sh
- MySQL Source
CREATE TABLE students_mysql (
id INT,
name STRING,
age INT,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'students',
'username'='root',
'password'='123456'
);
-- 求每个班的人数
select
clazz,
count(1) as num
from
students_mysql
group by
clazz;
- MySQL Sink
注意:[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.这个错误是这是一个Retract 流有更新和删除的作用为了数据的最终结果,要保证主键的唯一性
CREATETABLE clazz_num (
clazz STRING,
num BIGINT,PRIMARYKEY(clazz)NOT ENFORCED
)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=UTF-8','table-name'='clazz_num','username'='root','password'='123456');insertinto clazz_num
select
clazz,count(1)as num
from
students_mysql
groupby
clazz;
7.3、HDFS
- HDFS Source
CREATETABLE students_hdfs (
id INT,
name STRING,
age INT,
gender STRING,
clazz STRING
)WITH('connector'='filesystem','path'='hdfs://master:9000/bigdata29/data/students.txt','format'='csv');-- 求每个班的男生和女生各有多少人select
clazz,
gender,count(1)as num
from
students_hdfs
groupby
clazz,
gender;
- HDFS Sink
--仅追加的写入到HDFS中CREATETABLE clazz_gender_hdfs (
id INT,
name STRING,
age INT,
gender STRING,
clazz STRING
)WITH('connector'='filesystem','path'='hdfs://master:9000/bigdata29/flink/data','format'='csv');insertinto clazz_gender_hdfs
select id,name,age,gender,clazz from students_mysql;---- 2、将更新更改的结果写入hdfsCREATETABLE clazz_gender_num_hdfs (
clazz STRING,
gender STRING,
num BIGINT)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/bigdata29/flink/sqlSink/clazz_gender_num_hdfs',--必选:指定路径'format'='canal-json'-- 必选:文件系统连接器指定 format);insertinto clazz_gender_num_hdfs
-- 求每个班的男生和女生各有多少人select
clazz,
gender,count(1)as num
from
students_hdfs
groupby
clazz,
gender;
7.4、HBase
- 整合
# 将依赖包上传到flink的lib目录下
flink-sql-connector-hbase-2.2-1.15.2.jar
# 依赖更新后需要重启集群才会生效yarn application -listyarn application -kill[appid]
yarn-session.sh -d
sql-client.sh
- HBase sink
--创建hbase表create'students_flink','info'-- 创建hbase sink表CREATETABLE students_hbase (
id INT,
info ROW<name STRING,age INT,gender STRING,clazz STRING>,-- 指定列簇中的列PRIMARYKEY(id)NOT ENFORCED -- 设置hbaserowkey)WITH('connector'='hbase-2.2','table-name'='students_flink','zookeeper.quorum'='master:2181,node1:2181,node2:2181');insertinto students_hbase
select
id,ROW(name,age,gender,clazz)as info
from students_mysql;--查看结果select*from students_hbase;
scan 'students_flink'
7.5、datagen
用于生成测试数据,可以用于高性能测试
CREATE TABLE students_datagen (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='5', -- 指定每秒生成的数据量
'fields.id.length'='5',
'fields.name.length'='3',
'fields.age.min'='1',
'fields.age.max'='100',
'fields.sex.length'='1',
'fields.clazz.length'='4'
);
7.6、print
在task manager中打印结果
CREATETABLE print_table (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='print');CREATETABLE print_table
WITH('connector'='print')-- 应用目标表的字段创建新的LIKE students_datagen (EXCLUDING ALL);insertinto print_table
select*from students_datagen;
7.7、BlackHole
用于高性能测试
CREATETABLE blackhole_table (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='blackhole');insertinto blackhole_table
select*from students_datagen;
8、数据格式
8.1、csv
数据中字段的顺序需要和建表语句字段的顺序保持一致 (顺序映射)
默认按照逗号分割
CREATETABLE students_csv (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv',-- 指定数据的格式'csv.field-delimiter'=',',-- 指定分隔符'csv.ignore-parse-errors'='true'-- 跳过脏数据);
8.2、json
link表中的字段和类型需要和json中保持一致(同名映射)
CREATETABLE cars (
car STRING,
city_code STRING,
county_code STRING,
card BIGINT,
camera_id STRING,
orientation STRING,
road_id BIGINT,`time`BIGINT,
speed DOUBLE)WITH('connector'='kafka','topic'='cars',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='json',-- 指定数据的格式'json.ignore-parse-errors'='true');
8.3、canal-json
用于保存更新更改的结果流
CREATETABLE clazz_num (
clazz STRING,
num BIGINT)WITH('connector'='kafka','topic'='clazz_num','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='canal-json');insertinto clazz_num
select
clazz,count(1)as num
from
students_kafka
groupby
clazz;
9、时间属性
9.1、处理时间
处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。
-- PROCTIME() 生成处理时间的函数CREATETABLE words (
word STRING,
proctime AS PROCTIME()-- 声明一个额外的列作为处理时间属性)WITH('connector'='kafka','topic'='words','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);-- 实时统计每个单词最近5秒单词的数量select
word,
TUMBLE_START(proctime,INTERVAL'5'SECOND) win_start,
TUMBLE_END(proctime,INTERVAL'5'SECOND) win _end,count(word)as num
from
words
groupby
word,
TUMBLE(proctime,INTERVAL'5'SECOND);
9.2、事件时间
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。
事件时间属性也有类似于处理时间的三种定义方式:在DDL中定义、在 DataStream 到 Table 转换时定义、用 TableSource 定义。
CREATETABLE word_event_time (
word STRING,
word_event_time TIMESTAMP(3),-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR word_event_time AS word_event_time -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='words_event_time','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);select
word,
TUMBLE_START(word_event_time ,INTERVAL'5'SECOND) win_start,
TUMBLE_END(word_event_time ,INTERVAL'5'SECOND) win_end,count(word)as num
from
word_event_time
groupby
word,
TUMBLE(word_event_time ,INTERVAL'5'SECOND);
10、SQL语法
10.1、Hints
动态表选择:可以在查询表的时候动态修改表的参数配置
CREATETABLE students (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);select*from students /*+ OPTIONS('csv.ignore-parse-errors' ='true') */;-- latest-offset: 读取任务启动之后生产的数据select*from students /*+ OPTIONS('csv.ignore-parse-errors' ='true','scan.startup.mode' = 'latest-offset') */;CREATETABLE students_hdfs_stream (
id int,
name STRING,
age INT,
gender STRING,
clazz STRING
)WITH('connector'='filesystem',-- 必选:指定连接器类型'path'='hdfs://master:9000/data/students',-- 必选:指定路径'format'='csv'-- 必选:文件系统连接器指定 format);select*from students_hdfs_stream /*+OPTIONS('source.monitor-interval' = '5000') */
10.2、WITH 子句
WITH
子句提供了一种用于更大查询而编写辅助语句的方法。这些编写的语句通常被称为公用表表达式,表达式可以理解为仅针对某个查询而存在的临时视图
with tmp as(select
id,name,age,clazz
from
students_mysql
where age >22)select*from tmp
unionallselect*from tmp;
10.3、SELECT WHERE
select*from students_hdfs_stream
where
age >21and gender in('男','女');
10.4、SELECT DISTINCT
对于流处理的问题
1、flink会将之前的数据保存在状态中,用于判断是否重复
2、如果表的数据量很大,随着时间的推移状态会越来越大,状态的数据时先保存在TM的内存中的,时间长了可能会出问题
selectdistinct*from students /*+ OPTIONS('csv.ignore-parse-errors' ='true','scan.startup.mode' = 'latest-offset') */;
10.5、窗口函数
- TUMBLE(滚动窗口)
TUMBLE函数将每个元素分配给指定窗口大小的窗口。翻滚窗口有固定的大小,不重叠。例如,假设您指定了一个大小为5分钟的滚动窗口。在这种情况下,Flink将评估当前窗口,并且每五分钟启动一个新窗口,如下图所示。
TUMBLE函数根据时间属性字段为关系的每一行分配一个窗口。在流模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP_LTZ类型的属性。TUMBLE的返回值是一个新的关系,它包括原关系的所有列,以及额外的名为“window_start”,“window_end”,“window_time”的3列,以表示分配的窗口。原来的时间属性“timecol”将是一个常规的时间戳列
TUMBLE(TABLEdata, DESCRIPTOR(timecol), size [,offset])
data
:是一个表参数,可以与时间属性列有任意关系。timecol
:是一个列描述符,指示应将数据的哪些时间属性列映射到滚动窗口。size
:是指定滚动窗口宽度的持续时间。offset
:是一个可选参数,用于指定窗口起始位置的偏移量。
--创建bid的表CREATETABLE bid (
item STRING,
price DECIMAL(10,2),
bidtime TIMESTAMP(3),
WATERMARK FOR bidtime AS bidtime -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='bid','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);--生产数据
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
C,4.00,2020-04-1508:05:01
A,2.00,2020-04-1508:07:01
D,5.00,2020-04-1508:09:01
B,3.00,2020-04-1508:11:01
E,1.00,2020-04-1508:13:01
F,6.00,2020-04-1508:17:01-- TUMBLE:滚动窗口函数,在原表的基础上增加窗口开始时间,窗口结束时间,窗口时间SELECT item,price,bidtime,window_start,window_end,window_time FROMTABLE(
TUMBLE(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES));--或者SELECT*FROMTABLE(
TUMBLE(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES));-- 或者SQL>SELECT*FROMTABLE(
TUMBLE(DATA=>TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
SIZE =>INTERVAL'10' MINUTES));--每隔十分钟统计总价格SELECT window_start, window_end,SUM(price)FROMTABLE(
TUMBLE(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))GROUPBY window_start, window_end;
- HOP(滑动窗口)
HOP函数将元素分配给固定长度的窗口。与TUMBLE窗口函数一样,窗口的大小由窗口大小参数配置。另一个窗口滑动参数控制启动跳窗的频率。因此,如果幻灯片小于窗口大小,则跳跃窗口可以重叠。在这种情况下,元素被分配给多个窗口。跳窗也被称为“滑动窗”。
例如,您可以设置大小为10分钟的窗口,每隔5分钟滑动一次。这样,每隔5分钟就会出现一个窗口,其中包含最近10分钟内到达的事件,如下图所示。
HOP函数分配的窗口覆盖大小间隔内的行,并根据时间属性字段移动每张幻灯片。在流模式下,时间属性字段必须是事件或处理时间属性。在批处理模式下,窗口表函数的时间属性字段必须是TIMESTAMP或TIMESTAMP_LTZ类型的属性。HOP的返回值是一个新的关系,它包含了原关系的所有列,并增加了名为“window_start”,“window_end”,“window_time”的3列,以表示分配的窗口。原始的时间属性“timecol”将是窗口TVF之后的一个常规时间戳列。
HOP(TABLEdata, DESCRIPTOR(timecol), slide, size [,offset])
data
:是一个表参数,可以与时间属性列有任意关系。timecol
:是列描述符,指示应将数据的哪些时间属性列映射到跳跃窗口。slide
:是指定连续跳跃窗口开始之间的持续时间size
:是指定跳跃窗口宽度的持续时间。offset
:是一个可选参数,用于指定窗口起始位置的偏移量。
CREATETABLE bid_proctime (
item STRING,
price DECIMAL(10,2),
proctime AS PROCTIME())WITH('connector'='kafka','topic'='bid_proctime','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_proctime
C,4.00
A,2.00
D,5.00
B,3.00
E,1.00
F,6.00-- HOP: 滑动窗口函数SELECT item,price,proctime,window_start,window_end,window_time FROMTABLE(
HOP(TABLE bid_proctime, DESCRIPTOR(proctime),INTERVAL'5'SECOND,INTERVAL'10'SECOND));-- 窗口聚合SELECT
window_start,
window_end,avg(price)as avg_price
FROMTABLE(
HOP(TABLE bid_proctime, DESCRIPTOR(proctime),INTERVAL'5'SECOND,INTERVAL'10'SECOND))groupby
window_start,
window_end;
- CUMULATE (累积窗口)
累积窗口(Cumulative Window,也称为累积窗口或滚动累积窗口)是一种用于处理流数据的窗口类型,它允许窗口的长度随时间逐步增加,直到达到指定的最大长度。
CUMULATE(TABLEdata, DESCRIPTOR(timecol), step, size)
data
:是一个表参数,可以与时间属性列有任意关系。timecol
:是一个列描述符,指示应将数据的哪些时间属性列映射到累积窗口。step
:是指定连续累积窗口末尾之间增加的窗口大小的持续时间。size
:是指定累积窗口最大宽度的持续时间。size
必须是的整数倍step
。offset
:是一个可选参数,用于指定窗口起始位置的偏移量。
CREATETABLE bid (
item STRING,
price DECIMAL(10,2),
bidtime TIMESTAMP(3),
WATERMARK FOR bidtime AS bidtime -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='bid','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid
C,4.00,2020-04-1508:05:01
A,2.00,2020-04-1508:07:01
D,5.00,2020-04-1508:09:01
B,3.00,2020-04-1508:11:01
E,1.00,2020-04-1508:13:01
F,6.00,2020-04-1508:17:01SELECT*FROMTABLE(
CUMULATE(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES));--每2分钟累积一次数据,直到累积时间达到10分钟SELECT window_start, window_end,SUM(price)FROMTABLE(
CUMULATE(TABLE bid, DESCRIPTOR(bidtime),INTERVAL'2' MINUTES,INTERVAL'10' MINUTES))GROUPBY window_start, window_end;
- 会话窗口
CREATETABLE bid_proctime (
item STRING,
price DECIMAL(10,2),
proctime AS PROCTIME())WITH('connector'='kafka','topic'='bid_proctime','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bid_proctime
C,4.00
C,2.00
C,5.00
C,3.00
C,1.00
C,6.00-- 实时统计每个商品的总的金额,隔5秒没有数据开始统计select
item,
SESSION_START(proctime,INTERVAL'5'SECOND)as session_start,
SESSION_END(proctime,INTERVAL'5'SECOND)as session_end,sum(price)as sum_price
from
bid_proctime
groupby
item,SESSION(proctime,INTERVAL'5'SECOND);
10.6、GROUP BY
-- 出啊关键datagen source表CREATETABLE words_datagen (
word STRING
)WITH('connector'='datagen','rows-per-second'='50000',-- 指定每秒生成的数据量'fields.word.length'='5');CREATETABLE blackhole_table (
word STRING,
num BIGINT)WITH('connector'='blackhole');-- 分组聚合需要将之前的计算结果保存在状态中,-- 如果状态无限增长,会导致checkpoint时间拉长,如果checkpoint超时失败了,也会导致任务失败insertinto blackhole_table
select
word,count(1)as num
from
words_datagen /*+ OPTIONS('fields.word.length'='7') */groupby
word;
10.7、OVER
- sum max min avg count
CREATETABLE`order`(
order_id STRING,
amount DECIMAL(10,2),
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time -INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='order','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic order1,4.00,001,2020-04-1508:05:012,2.00,001,2020-04-1508:07:013,5.00,001,2020-04-1508:09:014,3.00,001,2020-04-1508:11:015,1.00,001,2020-04-1508:13:016,6.00,001,2020-04-1508:17:016,6.00,001,2020-04-1508:20:016,6.00,001,2020-04-1508:21:016,10.00,001,2020-04-1508:21:026,11.00,001,2020-04-1508:21:036,12.00,001,2020-04-1508:21:04-- 1、实时统计每个商品的累计总金额,将总金额放在每一条数据的后面-- 流处理的问题-- a、sum over必须按照时间升序排序,因为数据时一条一套过来的,只能做累加求和,不能做全局求和-- b、只能按照时间升序排序,如果按照其他的字段排序,每来一条数据都需要重新排序,计算代价太大,影响性能select
order_id,
amount,
product,
order_time,sum(amount)over(partitionby product
orderby order_time
)from`order`;/**
RANGE 间隔
RANGE ORDER BY 列的值上定义了一个间隔,在 Flink 中,该间隔始终是时间属性。以下 RANGE 间隔定义所有时间属性比当前行小最多 30 分钟的行都包含在聚合中。
RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
*/-- 2、实时统计每个商品的累计总金额,将总金额放在每一条数据的后面,只统计最近10分钟的数据select
order_id,
amount,
product,
order_time,sum(amount)over(partitionby product
orderby order_time
-- 统计10分钟前到当前行的数据
RANGE BETWEENINTERVAL'10' MINUTES PRECEDINGANDCURRENTROW)from`order`;/**
行间隔 #
间隔ROWS是基于计数的间隔。它精确定义了聚合中包含多少行。以下ROWS间隔定义了当前行和当前行之前的 10 行(因此总共 11 行)包含在聚合中。
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW
*/-- 3、实时统计每个商品的累计总金额,将总金额放在每一条数据的后面,计算最近5条数据select
order_id,
amount,
product,
order_time,sum(amount)over(partitionby product
orderby order_time
-- 从前4条数据到当前行ROWSBETWEEN4PRECEDINGANDCURRENTROW)from`order`;-- 4、实时统计每个商品的最大金额,将总金额放在每一条数据的后面,计算最近5条数据select
order_id,
amount,
product,
order_time,max(amount)over(partitionby product
orderby order_time
-- 从前4条数据到当前行ROWSBETWEEN4PRECEDINGANDCURRENTROW)from`order`;
10.8、TOP-N
- row_number
-- 如果只是增加排名,只能按照时间字段升序排序select
order_id,
amount,
product,
order_time,
row_number()over(partitionby product orderby order_time)as r
from`order`;-- 实时统计每个商品金额最高的前两个商品 -- TOPN-- 去完topn之后需要计算的排名的数据较少了,计算代价降低了select*from(select
order_id,
amount,
product,
order_time,
row_number()over(partitionby product orderby amount desc)as r
from`order`)where r <=2;
10.9、ORDER BY
-- 子流处理模式中,order by 需要按照时间字段升序排序select*from`order`orderby
order_time,amount
-- 加上limit ,计算代价就不高了,就可以按照普通字段进行排序了select*from`order`orderby
amount
limit2;
10.10、模式检测(CEP)
- 案例1
我们先实现第一版报警程序,对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。
CREATETABLEtran(
id STRING,
amount DECIMAL(10,2),
proctime as PROCTIME())WITH('connector'='kafka','topic'='tran','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='latest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic tran1,4.001,2.001,5.001,0.901,600.001,4.001,2.001,0.101,200.001,700.00-- MATCH_RECOGNIZE(模式检测)-- 在数据流上对数据进行匹配,当数满足我们定义的规则后,返回匹配的结果-- 我们先实现第一版报警程序,对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息。SELECT*FROMtran
MATCH_RECOGNIZE (PARTITIONBY id -- 分组字段ORDERBY proctime -- 排序字段,只能按照时间字段升序排序
MEASURES -- 相当于select
A.amount as min_amount,
A.proctime as min_proctime,
B.amount as max_amount,
B.proctime as max_proctime
PATTERN (A B)-- 定义规则
DEFINE -- 定义条件
A as amount <1,
B as amount >500)AS T
-- 我们先实现第一版报警程序,对于一个账户,如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信,两次事件需要在10秒内出现SELECT*FROMtran
MATCH_RECOGNIZE (PARTITIONBY id -- 分组字段ORDERBY proctime -- 排序字段,只能按照时间字段升序排序
MEASURES -- 相当于select
A.amount as min_amount,
A.proctime as min_proctime,
B.amount as max_amount,
B.proctime as max_proctime
PATTERN (A B)WITHININTERVAL'5'SECOND-- 定义规则,增加事件约束,需要在5秒内匹配出结果
DEFINE -- 定义条件
A as amount <1,
B as amount >500)AS T;-- 我们先实现第一版报警程序,对于一个账户,如果连续出现三次出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易,就输出一个报警信息SELECT*FROMtran
MATCH_RECOGNIZE (PARTITIONBY id -- 分组字段ORDERBY proctime -- 排序字段,只能按照时间字段升序排序
MEASURES -- 相当于select
A.amount as a_amount,-- 获取最后一条min(A.amount)as min_a_amount,-- 取最小的max(A.amount)as max_a_amount,-- 取最大的sum(A.amount)as sum_a_amount,-- 求和avg(A.amount)as avg_a_amount,-- 平均FIRST(A.amount)AS first_a_amount,-- 取前面第一条LAST(A.amount)AS LAST_a_amount,-- 取后面第一条
B.amount as b_amount
PATTERN (A{3} B)-- 定义规则
DEFINE -- 定义条件
A as amount <1,
B as amount >500)AS T;1,0.901,0.101,0.201,600.00
- 案例2
找出一个单一股票价格不断下降的时期
CREATETABLE ticker (
symbol STRING,
rowtime TIMESTAMP(3),-- 时间字段
price DECIMAL(10,2),
tax DECIMAL(10,2),-- 指定时间字段和水位线生成策略
WATERMARK FOR rowtime AS rowtime
)WITH('connector'='kafka','topic'='ticker','properties.bootstrap.servers'='master:9092,node1:9092,node2:9092','properties.group.id'='testGroup','scan.startup.mode'='latest-offset','format'='csv','csv.ignore-parse-errors'='true'-- 当有脏数据时是否跳过当前行);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic ticker
ACME,2024-06-0410:00:00,12,1
ACME,2024-06-0410:00:01,17,2
ACME,2024-06-0410:00:02,19,1
ACME,2024-06-0410:00:03,21,3
ACME,2024-06-0410:00:04,25,2
ACME,2024-06-0410:00:05,18,1
ACME,2024-06-0410:00:06,15,1
ACME,2024-06-0410:00:07,14,2
ACME,2024-06-0410:00:08,24,2
ACME,2024-06-0410:00:09,25,2
ACME,2024-06-0410:00:10,19,1-- 找出一个单一股票价格不断下降的时期select*from
ticker
MATCH_RECOGNIZE (PARTITIONBY symbol -- 分组字段ORDERBY rowtime -- 排序字段,只能按照时间字段升序排序
MEASURES -- 相当于select
A.price as a_price,FIRST(B.price)as FIRST_b_price,LAST(B.price)as last_b_price,
C.price as c_price
AFTERMATCH SKIP PAST LASTROW-- 从当前匹配成功止呕的下一行开始匹配
PATTERN (A B+ C)-- 定义规则
DEFINE -- 定义条件-- 如果时第一个B,就和A比较,如果时后面的B,就和前一个B比较
B as(LAST(B.price,1)isnulland B.price < A.price)or B.price <LAST(B.price,1),
C as C.price >LAST(B.price))AS T;
10.11、Joins
- Regular Joins
和hive sql中的join是一样的,
CREATETABLE students (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='latest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100004,葛德曜,24,男,理科三班
1500100005,宣谷芹,22,女,理科五班
1500100006,边昂雄,21,男,理科二班
1500100007,尚孤风,23,女,文科六班
CREATETABLE scores (
sid STRING,
cid STRING,
score INT)WITH('connector'='kafka','topic'='scores',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='latest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores1500100001,1000001,981500100001,1000002,51500100001,1000003,1371500100001,1000004,291500100001,1000005,851500100001,1000006,521500100002,1000001,1391500100002,1000002,102-- inner jion(内连接)select
a.id,a.name,b.sid,b.score
from
students as a
innerjoin
scores as b
on a.id=b.sid;--外连接-- left join (左连接)select
a.id,a.name,b.sid,b.score
from
students as a
leftjoin
scores as b
on a.id=b.sid;-- full join (全连接)select
a.id,a.name,b.sid,b.score
from
students as a
fulljoin
scores as b
on a.id=b.sid;-- 常规的关联方式,会将两个表的数据一直保存在状态中,时间长了,状态会越来越大,导致任务执行失败-- 状态有效期,状态在flink中保存的事件,状态保留多久需要根据实际业务分析SET'table.exec.state.ttl'='10000';
Interval Joins
Interval Join 用于在两个流之间进行时间间隔内的 Join。它允许你指定一个时间间隔,在这个时间间隔内匹配流中的元素。
CREATETABLE students_proctime (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING,
proctime AS PROCTIME())WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='latest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
1500100004,葛德曜,24,男,理科三班
1500100005,宣谷芹,22,女,理科五班
1500100006,边昂雄,21,男,理科二班
1500100007,尚孤风,23,女,文科六班
CREATETABLE scores_proctime (
sid STRING,
cid STRING,
score INT,
proctime AS PROCTIME())WITH('connector'='kafka','topic'='scores',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='latest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores1500100001,1000001,981500100001,1000002,51500100001,1000003,1371500100001,1000004,291500100001,1000005,851500100001,1000006,521500100002,1000001,1391500100002,1000002,102select a.id,a.name,b.sid,b.score from
students_proctime a, scores_proctime b
where a.id=b.sid
-- a表的时间需要在b表时间10秒内and(
a.proctime BETWEEN b.proctime -INTERVAL'10'SECONDAND b.proctime
or b.proctime BETWEEN a.proctime -INTERVAL'10'SECONDAND a.proctime
);
Temporal Joins
时态表是一个随时间变化的表,在Flink中也称为动态表。时态表中的行与一个或多个时态周期相关联,所有Flink表都是时态的(动态的)。时态表包含一个或多个版本表快照,它可以是一个不断变化的历史表,用于跟踪变化(例如:数据库变更日志,包含所有快照)或一个变化的维度表,它具体化了变化(例如:包含最新快照的数据库表)。
CREATETABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time --设置水位线)WITH('connector'='kafka','topic'='orders',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='latest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
o_001,1,EUR,2024-06-0612:00:00
o_002,100,EUR,2024-06-0612:00:07
o_003,200,EUR,2024-06-0612:00:16
o_004,10,EUR,2024-06-0612:00:21
o_005,20,EUR,2024-06-0612:00:25-- 汇率表CREATETABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32,2),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time,--设置水位线PRIMARYKEY(currency)NOT ENFORCED -- 主键,区分不同的汇率)WITH('connector'='kafka','topic'='currency_rates1',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='canal-json'-- 指定数据的格式);insertinto currency_rates
values('EUR',0.12,TIMESTAMP'2024-06-06 12:00:00'),('EUR',0.11,TIMESTAMP'2024-06-06 12:00:09'),('EUR',0.15,TIMESTAMP'2024-06-06 12:00:17'),('EUR',0.14,TIMESTAMP'2024-06-06 12:00:23');
kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic currency_rates-- 使用常规关联方式关联时态表只能关联到最新的数据select
a.price,a.order_time,b.conversion_rate,b.update_time
from
orders as a
join
currency_rates as b
on a.currency=b.currency;-- 时态表join-- FOR SYSTEM_TIME AS OF a.order_time: 使用a表的时间到b表中查询对应时间段的数据select
a.price,a.order_time,b.conversion_rate,b.update_time
from
orders as a
join
currency_rates FOR SYSTEM_TIME ASOF a.order_time as b
on a.currency=b.currency;
- Lookup Joins
用于流表关联维度表
流表:动态表
维度表:不怎么变化的变,维度表的数据一般可以放在hdfs或者mysql
CREATETABLE scores (
sid INT,
cid STRING,
score INT,
proctime AS PROCTIME())WITH('connector'='kafka','topic'='scores',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='latest-offset',-- 指定读取数据的位置'format'='csv'-- 指定数据的格式);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic scores1500100001,1000001,981500100002,1000002,51500100001,1000003,137CREATETABLE students (
id INT,
name STRING,
age INT,
gender STRING,
clazz STRING
)WITH('connector'='jdbc','url'='jdbc:mysql://master:3306/bigdata29','table-name'='student','username'='root','password'='123456','lookup.cache.max-rows'='1000',-- 最大缓存行数'lookup.cache.ttl'='10000'-- 缓存过期时间);--1、使用常规关联方式-- 维表的数据只在任务启动的时候读取一次,后面不再实时读取,-- 只能关联到任务启动时读取的数据select a.sid,a.score,b.id,b.name from
scores as a
leftjoin
students as b
on a.sid=b.id;-- lookup join-- 当流表每来一条数据时,使用关联字段到维表的数据源中查询-- 每一次都需要查询数据库,性能会降低select a.sid,a.score,b.id,b.name from
scores as a
leftjoin
students FOR SYSTEM_TIME ASOF a.proctime as b
on a.sid=b.id;
10.12、整合hive
- 整合
# 上传依赖到flink的lib目录下
flink-sql-connector-hive-3.1.2_2.12-1.15.2.jar
# 重启flink集群yarn application -listyarn application -kill XXX
yarn-session.sh -d
sql-client.sh
- hive catalog
catalog—>database—>table---->字段---->数据
catalog是数据库上面的一个概念,一个cataloglog中可以i有多个database,
catalog就是flink抽象的元数据层default_catalog:是flink默认的元数据,将元数据保存在jobmanager的内存中
-- 1、启动hive的元数据服务
nohup hive --service metastore &-- 2、创建hive catalogCREATE CATALOG hive_catalog WITH('type'='hive','hive-conf-dir'='/usr/local/soft/hive-3.1.2/conf');show catalogs;--3、切换catalog use catalog hive_catalog;-- 查询hive中的表select*from hive_catalog.bigdata29.students;-- 创建数据库createdatabase flink;-- flink可以查询hive的表,hive不能查询flink创建的动态表-- 在hive cagalog 中保存flink的动态表CREATETABLE students_csv (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv',-- 指定数据的格式'csv.field-delimiter'=',',-- 指定分隔符'csv.ignore-parse-errors'='true'-- 跳过脏数据);
- hive function
-- 加载hive函数LOAD MODULE hive WITH('hive-version'='3.1.2');select split('java,flink',',');CREATETABLElines(
line STRING
)WITH('connector'='kafka','topic'='lines',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv',-- 指定数据的格式'csv.field-delimiter'='|',-- 指定分隔符'csv.ignore-parse-errors'='true'-- 跳过脏数据);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic lines
java,java,flink
select
word,count(1)as num
fromlines,
lateral table(explode(split(line,','))) t(word)groupby
word;
10.13、Checkpoint
- 编写sql文件
vim word_count.sql
-- 1、创建source表CREATETABLElines(
line STRING
)WITH('connector'='kafka','topic'='lines',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv',-- 指定数据的格式'csv.field-delimiter'='|',-- 指定分隔符'csv.ignore-parse-errors'='true'-- 跳过脏数据);-- 创建sink表CREATETABLE print_table (
word STRING,
num BIGINT)WITH('connector'='print');-- 加载hive函数LOAD MODULE hive WITH('hive-version'='3.1.2');-- 执行sqlinsertinto print_table
select
word,count(1)as num
fromlines,
lateral table(explode(split(line,','))) t(word)groupby
word;
- 执行SQL文件
-- 第一次直接提交任务
sql-client.sh -f word_count.sql
- 失败重启
-- 基于hdfs中保存的快照重启任务
-- 在inert into 语句的前面增加
SET 'execution.savepoint.path'='hdfs://master:9000/flink/checkpoint/d915e6278f156a9278156e67105f914e/chk-36';
-- 重启任务
sql-client.sh -f word_count.sql
10.14、一个表被多次使用的时候
vim student.sql
CREATETABLE students_csv (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv',-- 指定数据的格式'csv.field-delimiter'=',',-- 指定分隔符'csv.ignore-parse-errors'='true'-- 跳过脏数据);-- 创建sink表CREATETABLE clazz_num (
clazz STRING,
num BIGINT)WITH('connector'='print');CREATETABLE sex_num (
sex STRING,
num BIGINT)WITH('connector'='print');-- 执行一组sql,如果多个sql中使用了同一张表,flink只会读取一次EXECUTE STATEMENT SETBEGINinsertinto clazz_num
select
clazz,count(1)as num
from
students_csv
groupby
clazz;insertinto sex_num
select
sex,count(1)as num
from
students_csv
groupby
sex;END;
10.15、反压
如果你看到一个 task 发生 反压警告(例如:
High
),意味着它生产数据的速率比下游 task 消费数据的速率要快。 在工作流中数据记录是从上游向下游流动的(例如:从 Source 到 Sink)。反压沿着相反的方向传播,沿着数据流向上游传播。
- 测试反压
-- 出啊关键datagen source表CREATETABLE words_datagen (
word STRING
)WITH('connector'='datagen','rows-per-second'='50000',-- 指定每秒生成的数据量'fields.word.length'='5');CREATETABLE blackhole_table (
word STRING,
num BIGINT)WITH('connector'='blackhole');-- 反压发生情况--1、单词太多,状态太大导致反压insertinto blackhole_table
select
word,count(1)as num
from
words_datagen /*+ OPTIONS('fields.word.length'='6') */groupby
word;--2、数据量太大导致反压insertinto blackhole_table
select
word,count(1)as num
from
words_datagen /*+ OPTIONS('fields.word.length'='5','rows-per-second'='400000') */groupby
word;
- 解决反压的方法- 增加资源
-- 1、增加Taskmanager的内存-- 启动汲取设置tm的内存yarn-session.sh -tm 6G -d-- 2、增加并行度SET'parallelism.default'='8';
- 预聚合-- 开启微批处理set'table.exec.mini-batch.enabled'='true';set'table.exec.mini-batch.allow-latency'='5 s';set'table.exec.mini-batch.size'='100000';-- 开启预聚合set'table.optimizer.agg-phase-strategy'='TWO_PHASE';
‘csv.field-delimiter’ = ‘|’ ,-- 指定分隔符
‘csv.ignore-parse-errors’ =‘true’ – 跳过脏数据
);
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic lines
java,java,flink
select
word,count(1) as num
from
lines,
lateral table(explode(split(line,‘,’))) t(word)
group by
word;
#### 10.13、Checkpoint
- 编写sql文件
> vim word_count.sql
~~~sql
-- 1、创建source表
CREATE TABLE lines (
line STRING
) WITH (
'connector' = 'kafka',
'topic' = 'lines', -- 指定topic
'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- 指定kafka集群列表
'properties.group.id' = 'testGroup', -- 指定消费者组
'scan.startup.mode' = 'earliest-offset', -- 指定读取数据的位置
'format' = 'csv', -- 指定数据的格式
'csv.field-delimiter' = '|' ,-- 指定分隔符
'csv.ignore-parse-errors' ='true' -- 跳过脏数据
);
-- 创建sink表
CREATE TABLE print_table (
word STRING,
num BIGINT
) WITH (
'connector' = 'print'
);
-- 加载hive函数
LOAD MODULE hive WITH ('hive-version' = '3.1.2');
-- 执行sql
insert into print_table
select
word,count(1) as num
from
lines,
lateral table(explode(split(line,','))) t(word)
group by
word;
- 执行SQL文件
-- 第一次直接提交任务
sql-client.sh -f word_count.sql
- 失败重启
-- 基于hdfs中保存的快照重启任务
-- 在inert into 语句的前面增加
SET 'execution.savepoint.path'='hdfs://master:9000/flink/checkpoint/d915e6278f156a9278156e67105f914e/chk-36';
-- 重启任务
sql-client.sh -f word_count.sql
10.14、一个表被多次使用的时候
vim student.sql
CREATETABLE students_csv (
id STRING,
name STRING,
age INT,
sex STRING,
clazz STRING
)WITH('connector'='kafka','topic'='students',-- 指定topic'properties.bootstrap.servers'='master:9092,node1:9092,node2:9092',-- 指定kafka集群列表'properties.group.id'='testGroup',-- 指定消费者组'scan.startup.mode'='earliest-offset',-- 指定读取数据的位置'format'='csv',-- 指定数据的格式'csv.field-delimiter'=',',-- 指定分隔符'csv.ignore-parse-errors'='true'-- 跳过脏数据);-- 创建sink表CREATETABLE clazz_num (
clazz STRING,
num BIGINT)WITH('connector'='print');CREATETABLE sex_num (
sex STRING,
num BIGINT)WITH('connector'='print');-- 执行一组sql,如果多个sql中使用了同一张表,flink只会读取一次EXECUTE STATEMENT SETBEGINinsertinto clazz_num
select
clazz,count(1)as num
from
students_csv
groupby
clazz;insertinto sex_num
select
sex,count(1)as num
from
students_csv
groupby
sex;END;
10.15、反压
如果你看到一个 task 发生 反压警告(例如:
High
),意味着它生产数据的速率比下游 task 消费数据的速率要快。 在工作流中数据记录是从上游向下游流动的(例如:从 Source 到 Sink)。反压沿着相反的方向传播,沿着数据流向上游传播。
- 测试反压
-- 出啊关键datagen source表CREATETABLE words_datagen (
word STRING
)WITH('connector'='datagen','rows-per-second'='50000',-- 指定每秒生成的数据量'fields.word.length'='5');CREATETABLE blackhole_table (
word STRING,
num BIGINT)WITH('connector'='blackhole');-- 反压发生情况--1、单词太多,状态太大导致反压insertinto blackhole_table
select
word,count(1)as num
from
words_datagen /*+ OPTIONS('fields.word.length'='6') */groupby
word;--2、数据量太大导致反压insertinto blackhole_table
select
word,count(1)as num
from
words_datagen /*+ OPTIONS('fields.word.length'='5','rows-per-second'='400000') */groupby
word;
- 解决反压的方法- 增加资源
-- 1、增加Taskmanager的内存-- 启动汲取设置tm的内存yarn-session.sh -tm 6G -d-- 2、增加并行度SET'parallelism.default'='8';
- 预聚合-- 开启微批处理set'table.exec.mini-batch.enabled'='true';set'table.exec.mini-batch.allow-latency'='5 s';set'table.exec.mini-batch.size'='100000';-- 开启预聚合set'table.optimizer.agg-phase-strategy'='TWO_PHASE';
版权归原作者 尔等都不懂涐 所有, 如有侵权,请联系我们删除。