文章目录
Flink中的DataStream程序是对数据流(例如 过滤、更新状态、定义窗口、聚合)进行转换的常规程序。
Source 源算子 — 连接数据源,读取数据源
Transformation 转换算子 — 操作读取到的数据 ----工具就是 DataStream API
这个Flink的依赖比较难整,可以注意版本的依赖关系
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkDemo-1-17</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.16.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.16.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.16.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.16.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.16.3</version></dependency><!-- 从jdbc中读取数据的依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.16.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.30</version></dependency><!-- 从文件中读取数据的依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.16.3</version></dependency><!-- 从kafka中读取数据的依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.16.3</version></dependency><!-- 从数据生成器中地区数据的依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>1.17.0</version></dependency></dependencies></project>
一、DataStream是什么
官网介绍:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/overview/
DataStream API 得名于特殊的DataStream类,该类用于表示Flink程序中的数据集合。
可以认为它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,可以是无界(无限)的,但是用于处理他们的API事相同的。
DataStream在用法上类似常规的java集合,但是在某些关键方面却大不相同。
他们是不可变的,这意味着一但他们被创建,你就不能添加或删除元素。你也不能简单地查看内部元素,而是只能使用DataStream API操作来处理它们
DataStream API 操作也叫作转换(transformation 转换算子)
可以通过在Flink程序中添加source创建一个初始的DataStream,然后,基于DataStream派生新的流,并使用map、filter等API方法把DataStream 和 派生的流 连接在一起。
二、 Flink的代码介绍
Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:
- 获取执行环境(execution environment);
- 加载/创建初始数据(读取数据源);
- 指定数据相关的转换(转换操作);
- 指定计算结果的存储位置(输出);
- 触发程序执行(触发执行);
2.1 执行环境
创建执行环境StreamExecutionEnvironment
StreamExecutionEnvironment 是所有 Flink 程序的基础。
创建流执行环境的三种方法:
- 根据当前运行环境创建
- 根据本地执行环境创建
- 根据集群执行环境创建
// 方法一:getExecutionEnvironment 方法。这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 方法二:createLocalEnvironment,返回本地执行环境;可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数StreamExecutionEnvironment localEnv =StreamExecutionEnvironment.createLocalEnvironment();// 方法三:createRemoteEnvironment返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包StreamExecutionEnvironment remoteEnv =StreamExecutionEnvironment.createRemoteEnvironment("host",// JobManager 主机名1234,// JobManager 进程端口号"path/to/jarFile.jar"// 提交给 JobManager 的JAR包);
2.2 执行模式Execution Mode
从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处理。不建议使用 DataSet API。
// 流处理环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
2.2.1 DataStream API 执行模式
DataStream API 执行模式包括:流执行模式、批执行模式和自动模式:
- 流执行模式(Streaming)DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 Streaming 执行模式。
- 批执行模式(Batch)专门用于批处理的执行模式。
- 自动模式(AutoMatic)在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
2.2.2 配置执行环境
- 方法一:通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加 execution.runtime-mode 参数,指定值为BATCH。
- 方法二:通过代码配置
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。
3. 方法三:触发程序执行
Flink 是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”
同步执行 or 异步执行(了解)
env.execute();// 同步
env.executeAsync();// 异步
exexute总结:
- 默认 env.execute() 触发一个Flink job 一个main方法可以调用多个execute,但是没有意义,指定一个就会阻塞住(同步)
- env.executeAsync(),异步,不阻塞 一个main方法里 executeAsync()个数 = 生成Flink job数
三、源算子(Source)—数据源
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。
一般将 数据的输入来源称为数据源(Data Source),而读取数据的算子就是源算子(Source Operator)。
所以,source是整个处理程序的输入端。
3.1 添加source
# Flink1.2开始,流批统一的新 Source 架构(fromSource()方法)
DataStreamSource<String> stream = env.fromSource(…);
Flink 直接提供了很多预实现的接口,此外还有很多外部连接工具也实现了对应的Source,通常情况下足以应对实际需求
3.2 使用source
3.2.1 从集合中读取数据-fromElements
fromElements的作用
fromElements主要用于创建一个基于本地元素集合的数据来源,主要用户测试flink环境的是否可行的。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1,2,3,4,5)// 直接填写元素// .fromCollection(Arrays.asList(1, 2, 3, 4, 5)) // 从集合读取元素.print();
env.execute();
3.2.2 从文件/数据库/消息队列中读取数据 - fromSource
这样就需要添加对应的依赖才可以;
fromSource的作用:
fromSource是一种更通用的创建数据源的方式,用于从各种外部系统或存储中获取数据。它可以连接到不同类型的数据源,如文件系统(通过FileSource等)、消息队列(如 KafkaSource)、数据库等eg :连接kafka
DataStream stream = env.fromSource(new KafkaSource<>(), WatermarkStrategy.noWatermarks(), “Kafka Source”);
通常情况,会从存储介质中读取数据,一个比较常见的方式就是读取日志文件。
这也是批处理中最常见的读取方式。
读取文件,需要添加文件连接器依赖
<-- 注意版本号,和flink要一致!--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency>
示例代码:
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.connector.file.src.FileSource;importorg.apache.flink.connector.file.src.reader.TextLineInputFormat;importorg.apache.flink.core.fs.Path;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importjava.io.File;// - 执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// - FileSource 是一种数据源,用于从文件系统读取数据 // FileSource.forRecordStreamFormat定义数据读取格式,也就是如何解析文件中的数据// TextLineInputFormat用于读取文本文件的输入格式,主要功能是将文本文件按行进行读取FileSource<String> fileSource =FileSource.forRecordStreamFormat(newTextLineInputFormat(),// 文件路径,相对路径不行就用绝对路径Path.fromLocalFile(newFile("D:\\workspace\\IdeaProjects\\second-java\\day5-flink\\src\\main\\resources\\input\\words.txt"))).build();// - 创建数据源// WatermarkStrategy.noWatermarks()表示不使用 Watermark 机制,不生产Watermark // Watermark 的作用:Watermark 是 Flink 处理乱序流数据时用于衡量事件时间(Event Time)进度的机制, 确定数据的完整性,判断何时可以触发窗口计算等操作
env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"fileSource").print();
env.execute();
代码说明:
- 参数可以是目录,也可以是文件;还可以从 HDFS 目录(hadoop)下读取,使用路径hdfs://…;
- 路径可以是相对路径,也可以是绝对路径;
- 相对路径是从系统属性 user.dir 获取路径:idea 下是project 的根目录,standalone模式下是集群节点根目录。
3.2.3 从Socket(服务器)读取数据 – socketTextStream
读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性差,一般只用于测试
// 读取数据:socket,第一个参数书服务器名称,第二个参数是端口号DataStreamSource<String> lineStream = env.socketTextStream("localhost",7777);
3.2.4 从kafka读取数据
Flink官网提供了连接工具 flink-connector-kafka,直接实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SouceFunction
引入kafka连接器的依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>
代码:
// 执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// kafka数据源的连接配置KafkaSource<String> kafkaSource =KafkaSource.<String>builder()// .<>写法没错.setBootstrapServers("124.222.253.33:9092")// ip:port.setTopics("topic_1")// topic.setGroupId("group1")// 消费者组// latest 将偏移初始化为最新偏移的OffsetInitializer// earliest 偏移初始化为最早可用偏移的OffsetInitializer.setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(newSimpleStringSchema()).build();// 仅Value反序列化// 连接数据源source
env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-source").print();// 执行
env.execute();
3.2.5 从数据生成器读取数据
Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。
导入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
代码
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource =newDataGeneratorSource<>(// 数据转换生成函数newGeneratorFunction<Long,String>(){@OverridepublicStringmap(Long value)throwsException{return"Number:"+ value;}},// 从0开始递增至这个数Long.MAX_VALUE,// 数据生产效率,每秒10个RateLimiterStrategy.perSecond(10),Types.STRING
);
env.fromSource(dataGeneratorSource,WatermarkStrategy.noWatermarks(),"dataGenerator").print();
env.execute();
3.2.6 自定义SourceFunction来实现JDBC数据源(mysql)的读取
第一步:创建自定义的SourceFunction
packagecom.flink17.demo;importorg.apache.flink.streaming.api.functions.source.RichSourceFunction;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;/**
* @author lc
* @version 1.0
* @date 2024/10/31 0031 15:23
*/publicclassCustomJDBCSourceFunctionextendsRichSourceFunction<String>{privateboolean running =true;privateString dbURL;privateString query;privateString user;privateString pw;publicCustomJDBCSourceFunction(String dbURL,String query,String user,String pw){this.dbURL = dbURL;this.query = query;this.user = user;this.pw = pw;}/**
* 只返回某一列的数据
*
* @param sourceContext
* @throws Exception
*//* @Override
public void run(SourceContext<String> sourceContext) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
try (Connection connection = DriverManager.getConnection(dbURL,user,pw);
PreparedStatement statement = connection.prepareStatement(query);
ResultSet resultSet = statement.executeQuery()) {
while (running && resultSet.next()) {
sourceContext.collect(resultSet.getString(2)); // Assuming the query returns a single column
}
}
}*//** 返回查询的所有数据 */@Overridepublicvoidrun(SourceContext<String> sourceContext)throwsException{Class.forName("com.mysql.jdbc.Driver");Connection connection =DriverManager.getConnection(dbURL,user,pw);PreparedStatement statement = connection.prepareStatement(query);try{while(running){ResultSet resultSet = statement.executeQuery();while(resultSet.next()){// 获取所有列的数据StringBuilder sb =newStringBuilder();int columnCount = resultSet.getMetaData().getColumnCount();for(int i =1; i <= columnCount; i++){
sb.append(resultSet.getString(i));if(i < columnCount){
sb.append(",");}}
sourceContext.collect(sb.toString());}// 休眠指定的时间间隔Thread.sleep(1000);// 暂停一秒,避免过快查询数据库}}finally{
statement.close();
connection.close();}}@Overridepublicvoidcancel(){
running =false;}}
第二步:
String jdbcUrl ="jdbc:mysql://localhost:3306/eam_pc";String driveName ="com.mysql.cj.jdbc.Driver";String username ="root";String password ="root";String query ="SELECT id,apply_name FROM pc_apply";CustomJDBCSourceFunction jdbcSourceFunction =newCustomJDBCSourceFunction(jdbcUrl, query, username, password);
env.addSource(jdbcSourceFunction).print();
env.execute();
数据库中数据是:
代码查询数据为:
四、Flink支持的数据类型
Flink 使用“类型信息”(TypeInfomation)来统一表示数据类型。
TypeInfomation类是Flink中所有类型描述符的基类。
TypeInfomation涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
对于常见的java和scala数据类型,Flink都支持。
Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到。
- 基本类型 所有java基本类型 及其 包装类,再加上Void、String、Date、BigDecimal、BigInteger
- 数据类型 基本类型数组 和对象数组
- 复合数据类型- Java 元组类型(TUPLE):这是 Flink 内置的元组类型,是Java API 的一部分。最多25 个字段,也就是从 Tuple0~Tuple25,不支持空字段。- 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。- POJO:Flink 自定义的类似于 Java bean 模式的类。
Flink 对 POJO 类型的要求如下:
1.类是公有(public)的
2.有一个无参的构造方法
3.所有属性都是公有(public)的
4.所有属性的类型都是可以序列化的
对象被Flink视为POJO对象,序列化使用的是 PojoSerializer【效率比较高】
序列化的时候, 就只会序列化字段,方法不会被序列化
反序列化的时候,会根据公有的无参构造方法, 去构造对象
可以使用 TypeExtractor.createTypeInfo(xxx.class) 判断对象是不是Flink世界中的POJO对象:
1.返回的是PojoType,对象会被Flink视为POJO对象
2.返回的是GenericType,对象不会被Flink视为POJO对象
- 辅助类型 Option、Either、List、Map 等。
- 泛型类型(GENERIC) Flink 会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由 Flink 本身序列化的,而是由 Kryo 序列化的。
五、类型提示(Type Hints)
有时,需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据。
.map(word ->Tuple2.of(word,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG));
或者,使用TypeHint 类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。仍通过.returns()方法,明确地指定转换之后的DataStream 里元素的类型。
returns(newTypeHint<Tuple2<Integer,SomeType>>(){})
示例代码:
publicstaticvoidmain(String[] args)throwsException{//1.创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//2.读取数据:socket,第一个参数书服务器名称,第二个参数是端口号DataStreamSource<String> socketDS = env.socketTextStream("hadoop102",7777);//3.处理数据:切分,转换,分组,聚合,输出
socketDS
.flatMap((String value,Collector<Tuple2<String,Integer>> out)->{String[] words = value.split(" ");// 空格切分for(String word : words){// 转换成二元组Tuple2<String,Integer> wordsAndOne =Tuple2.of(word,1);// 使用Collector向下游发送数据
out.collect(wordsAndOne);}}).returns(Types.TUPLE(Types.STRING,Types.INT))//需要执行返回类型,(word,1).keyBy(value -> value.f0).sum(1).print();// 输出//4.执行
env.execute();}
六、转换算子 Transformation–操作获得的数据 – 工具就是DataStream API
这里就是我们要真正要说的 Flink DataStream的使用;
转换算子一般有三种写法:
- 匿名类
- lambda表达式
- 实现函数接口 数据源读入数据之后,就可以使用各种转换算子,将一个或多个DataStream转换为新的 DataStream。
6.1 准备数据模型类
publicclassWaterSensor{publicString id;publicLong ts;publicInteger vc;publicWaterSensor(){}publicWaterSensor(String id,Long ts,Integer vc){this.id = id;this.ts = ts;this.vc = vc;}publicStringgetId(){return id;}publicvoidsetId(String id){this.id = id;}publicLonggetTs(){return ts;}publicvoidsetTs(Long ts){this.ts = ts;}publicIntegergetVc(){return vc;}publicvoidsetVc(Integer vc){this.vc = vc;}@Overridepublicbooleanequals(Object o){if(this== o)returntrue;if(o ==null||getClass()!= o.getClass())returnfalse;WaterSensor that =(WaterSensor) o;returnObjects.equals(id, that.id)&&Objects.equals(ts, that.ts)&&Objects.equals(vc, that.vc);}@OverridepublicinthashCode(){returnObjects.hash(id, ts, vc);}@OverridepublicStringtoString(){return"WaterSensor{"+"id='"+ id +'\''+", ts="+ ts +", vc="+ vc +'}';}}
6.2 基本转换算子(map/filter/flatMap)
6.2.1 映射 map ---- 直接收集某个字段的数据
消费一个元素就产生一个元素、
需求案例:提取WaterSensor中的id字段
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 这里就是上面将source的时候自己定义的集合数据源// fromElements主要用于创建一个基于本地元素集合的数据来源,主要用户测试flink环境的是否可行的。DataStreamSource<WaterSensor> stream = env.fromElements(newWaterSensor("sensor_1",1L,1),newWaterSensor("sensor_2",2L,2));
stream.map(WaterSensor::getId).print();
env.execute();}
6.2.2 过滤filter ----- 类似if
需求:将数据流中传感器id为sensor_1的数据过滤出来
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(newWaterSensor("sensor_1",1L,1),newWaterSensor("sensor_2",2L,2));
stream.filter(el ->"sensor_1".equals(el.getId())).print();
env.execute();
6.2.3 扁平映射flatMap -----里面可以写ifelse
flatMap一进多出
需求:如果输入的数据是sensor_1,指打印VC;如果输入的数据是sensor_2,既打印ts又打印vc。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(newWaterSensor("sensor_1",1L,1),newWaterSensor("sensor_2",2L,2));
stream.flatMap(newFlatMapFunction<WaterSensor,String>(){@OverridepublicvoidflatMap(WaterSensor value,Collector<String> out)throwsException{if("sensor_1".equals(value.getId())){
out.collect(value.getVc().toString());}elseif("sensor_2".equals(value.getId())){
out.collect(value.getTs().toString());
out.collect(value.getVc().toString());}}}).print();
env.execute();
问题:
- map如何控制一进一出: 使用return
- flatmap怎么控制一进多出 通过Collector输出,调用几次就输出几条(向下游输送数据)
6.3 聚合算子Aggregation
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并—这就是所谓“聚合”aggregation,类似于MapReduce中的reduce操作。
6.3.1 按键分区 keyBy
在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键key,可以将一条流从逻辑上划分成不同的分区partitions。
这里所说的分区,其实就是并行处理的子任务。
通过计算key的哈希值hash code,对分区数进行取模运算来实现的。
所以这里key如果是POJO的话,必须要重写hashCode()方法。
id 作为key做分区操作,代码:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(newWaterSensor("sensor_1",1L,1),newWaterSensor("sensor_1",2L,2),newWaterSensor("sensor_2",2L,2),newWaterSensor("sensor_3",3L,3));KeyedStream<WaterSensor,String> keyedStream = stream.keyBy(WaterSensor::getId);
keyedStream.print();
env.execute();
前面的编号就是并行度编号,也就是线程数编号;
keyBy:
返回的是 KeyedStream ,键控流
不是转换算子,只是对数据进行重分区,不能设置并行度
keyBy分组 和 分区 的关系:
keyBy对数据分组(相同key的数据被分为一组),同时保证 相同的key的数据 在同一个分区
分区):一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如 sum,reduce)。
6.3.2 简单聚合(sum/min/max/minBy/maxBy)
有了按键分区的数据流KeyedStream,就可以基于它进行聚合操作了。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<WaterSensor> stream = env.fromElements(newWaterSensor("sensor_1",2L,1),newWaterSensor("sensor_1",1L,2),newWaterSensor("sensor_1",3L,3),newWaterSensor("sensor_1",4L,3),newWaterSensor("sensor_1",5L,7),newWaterSensor("sensor_1",6L,6),newWaterSensor("sensor_2",2L,27),newWaterSensor("sensor_2",1L,51),newWaterSensor("sensor_2",5L,2),newWaterSensor("sensor_2",2L,21),newWaterSensor("sensor_2",2L,2),newWaterSensor("sensor_3",3L,50),newWaterSensor("sensor_3",2L,34),newWaterSensor("sensor_3",3L,37));KeyedStream<WaterSensor,String> keyedStream = stream.keyBy(WaterSensor::getId);
keyedStream
.maxBy("vc")// 指定字段名称.print("maxBy");//.print();
env.execute();
执行后的结果
Flink内置实现了一些最基本、最简单的聚合API,主要有以下几种:
- max:取指定字段的当前的最大值,如果有多个字段,其他非比较字段,以第一条为准
- maxBy:取指定字段的当前的最大值,如果有多个字段,其他字段以最大值那条数据为准;
- min:取指定字段的当前的最小值,如果有多个字段,其他非比较字段,以第一条为准
- minBy:取指定字段的当前的最小值,如果有多个字段,其他字段以最大值那条数据为准;
只是用方法定义的做比较多字段的值进行比较,然后按照指定的去更新当前的数据;
聚合方法调用时,也需要传入参数,聚合指定的字段。指定字段的方式有两种:指定位置,和指定名称。【指定位置索引,适用于 Tuple类型,POJP不行】
keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。
一个聚合算子,会为每一个 key 保存一个聚合的值,在Flink 中把它叫作“状态”(state)。
每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的。
使用聚合算子,应该只用在含有有限个 key 的数据流上。
6.3.3 归约聚合(reduce)
案例:使用reduce实现max和maxBy的功能
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<WaterSensor> stream = env.fromElements(newWaterSensor("sensor_1",1L,1),newWaterSensor("sensor_1",2L,2),newWaterSensor("sensor_2",2L,2),newWaterSensor("sensor_3",3L,3));KeyedStream<WaterSensor,String> keyedStream = stream.keyBy(WaterSensor::getId);
keyedStream
.reduce(newReduceFunction<WaterSensor>(){// 同组元素规约处理@OverridepublicWaterSensorreduce(WaterSensor value1,WaterSensor value2)throwsException{System.out.println("value1: "+ value1);System.out.println("value2: "+ value2);int maxVc =Math.max(value1.getVc(), value2.getVc());if(value1.getVc()> value2.getVc()){
value1.setVc(maxVc);return value1;}else{
value2.setVc(maxVc);return value2;}}}).print();
env.execute();
reduce 输入类型 = 输出类型,类型不变
每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
reduce方法中的两个参数
value1:之前计算结果,有状态
value2:现在来的数据
七、 用户自定义函数(UDF)
用户自定义函数(user-defined function ,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数氛围:函数类、匿名函数、富函数类。
7.1 函数类
函数类可以传递参数更加灵活
publicclassFilterIdFunctionimplementsFilterFunction<WaterSensor>{privatefinalString id;publicFilterIdFunction(String id){this.id = id;}@Overridepublicbooleanfilter(WaterSensor value)throwsException{return id.equals(value.getId());}}
7.2 富函数类(rich function classes)
“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的Flink 函数类都有其 Rich版本 。
富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。
富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- open()方法,是 Rich Function 的初始化方法,每个子任务启动时,调用一次。
- close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。 - 如果是Flink程序异常挂掉,不会调用close - 正常调用cancel命令,可以close
代码实现:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env
.fromElements(1,2,3,4).map(newRichMapFunction<Integer,Integer>(){@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);System.out.println(" 索引是:"+getRuntimeContext().getIndexOfThisSubtask()+" 的任务的生命周期开始");}@OverridepublicIntegermap(Integer integer)throwsException{return integer +1;}@Overridepublicvoidclose()throwsException{super.close();System.out.println(" 索引是:"+getRuntimeContext().getIndexOfThisSubtask()+" 的任务的生命周期结束");}}).print();
env.execute();
八、 物理分区算子 (Physical Partitioning)
常见的物理分区策略有:
- 随机分配Random
- 轮询分配Round-Robin
- 重缩放Rescale
- 广播Broadcast
8.1 随机 分区 shuffle
将数据随机地分配到下游算子 的并行任务中去。可以打乱数据
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);DataStreamSource<String> stream = env.socketTextStream("124.222.253.33",7777);
stream.shuffle().print();
env.execute();
8.2 轮询 分区 Round-Robin
雨露均沾,下游的所有算子一个一个来
stream.rebalance()
8.3 重缩放分区rescale
重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,
但是只会将 数据轮询发送到下游并行任务的一部分 中。(部分算子)
stream.rescale()
8.4 广播
将输入数据复制并发送到下游算子的所有并行任务中去。
stream.broadcast()
8.5 全局分区Global
一种特殊的分区方式。这种做法非常极端,通过调用。global()方法,会将所有的输入数据流都发送到下游算子的第一个并行子任务中区。这就相当于强行让下游任务并行度变成1.
stream.global()
8.6 自定义分区 custom
Flink内置所有分区策略都不能满足用户的需求是,可以通过使用partitionCustom()方法自定义分区策略
第一步:自定义分区器
publicclassMyPartitionerimplementsPartitioner<String>{// numPartitions: 子任务数量(分区数量)@Overridepublicintpartition(String key,int numPartitions){returnInteger.parseInt(key)% numPartitions;}}
第二步:使用自定义分区
publicclassPartitionCustomDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);DataStreamSource<String> socketDS = env.socketTextStream("124.222.253.33",7777);DataStream<String> myDS = socketDS
.partitionCustom(newMyPartitioner(),
value -> value);
myDS.print();
env.execute();}}
九、分流 – 筛数据
将一条数据流拆分成完全独立的两条、甚至多条流。
也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流中。
9.1 简单实现
使用filter筛选多次,将原始数据stream复制多分,然后对每一份分别做筛选;
不够高效;
9.2 侧输出流 - process算子
调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(newWaterSensor("sensor_1",2L,1),newWaterSensor("sensor_1",1L,2),newWaterSensor("sensor_1",3L,3),newWaterSensor("sensor_1",4L,3),newWaterSensor("sensor_1",5L,7),newWaterSensor("sensor_1",6L,6),newWaterSensor("sensor_2",2L,27),newWaterSensor("sensor_2",1L,51),newWaterSensor("sensor_2",5L,2),newWaterSensor("sensor_2",2L,21),newWaterSensor("sensor_2",2L,2),newWaterSensor("sensor_3",3L,50),newWaterSensor("sensor_3",2L,34),newWaterSensor("sensor_3",3L,37));/* stream.keyBy(WaterSensor::getId)
.min("vc").print("min");*/// 分流 - 筛数据//第一步:定义标签OutputTag<WaterSensor> s1 =newOutputTag<>("sensor_1",Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2 =newOutputTag<>("sensor_2",Types.POJO(WaterSensor.class));// 第二步:返回数据,下面的逻辑把 数据 数据分为了三种 ,一种是if逻辑里面的s1标签。一种是elseif里面的s2标签,最后 else是剩下的未标签的数据// ProcessFunction 参数一 输入,参数二 输出SingleOutputStreamOperator<WaterSensor> cxtProcess = stream.process(newProcessFunction<WaterSensor,WaterSensor>(){@OverridepublicvoidprocessElement(WaterSensor waterSensor,Context context,Collector<WaterSensor> collector)throwsException{if("sensor_1".equals(waterSensor.getId())){
context.output(s1, waterSensor);}elseif("sensor_2".equals(waterSensor.getId())){
context.output(s2, waterSensor);}else{
collector.collect(waterSensor);}}});// 打印主流----未打标签的数据
cxtProcess.print("cxtProcess");// 通过主流获取侧边流
cxtProcess.getSideOutput(s1).printToErr("侧边流s1");
cxtProcess.getSideOutput(s2).printToErr("侧边流s2");
env.execute();
从上面可以看到,ProcessFunction类的processElement()的参数含义:
十、合流
10.1 联合union
联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
可以将多条流联合在一起。
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1,2,3);DataStreamSource<Integer> ds2 = env.fromElements(2,2,3);DataStreamSource<String> ds3 = env.fromElements("2","2","3");// ds3 类型不一致,不能联合
ds1.union(ds2).print();
env.execute();
10.2 连接Connect
使用connect合流:
- 一次只能连接2条流
- 流的数据类型可以不一样
- 连接后可用调用map、flatMap、process来处理,但是各处理各的
10.2.1 简单使用:
只能连接2条流,流动数据类型可以不一样
代码:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<Integer> source1 = env.fromElements(1,2,3);DataStreamSource<String> source2 = env.fromElements("a","b","c");ConnectedStreams<Integer,String> connect = source1.connect(source2);/*
connect:
1、一次只能连接 2 条流
2、流的数据类型可以不一样
3、连接后可以调用 map、flatmap、process 来处理,但是各处理各的
*/
connect.map(newCoMapFunction<Integer,String,String>(){@OverridepublicStringmap1(Integer value)throwsException{return"来源于数字流:"+ value.toString();}@OverridepublicStringmap2(String value)throwsException{return"来源于字母流:"+ value;}}).print();
env.execute();
10.2.2 进阶使用:CoProcessFunction
需求:连接两条流,输出根据id匹配上的数据
注意:connectedStreams.keyBy(keySelector1, keySelector2);
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(2);DataStreamSource<Tuple2<Integer,String>> source1 = env.fromElements(Tuple2.of(1,"a1"),Tuple2.of(1,"a2"),Tuple2.of(2,"b"),Tuple2.of(3,"c"));DataStreamSource<Tuple3<Integer,String,Integer>> source2 = env.fromElements(Tuple3.of(1,"aa1",1),Tuple3.of(1,"aa2",2),Tuple3.of(2,"bb",1),Tuple3.of(3,"cc",1));// 定义HashMap , 缓存来过的数据,key=id,value=list<数据>Map<Integer,List<Tuple2<Integer,String>>> s1Cache =newHashMap<>();Map<Integer,List<Tuple3<Integer,String,Integer>>> s2Cache =newHashMap<>();ConnectedStreams<Tuple2<Integer,String>,Tuple3<Integer,String,Integer>> connect = source1.connect(source2);// 将合流元素,按key分到同一分区才能得到如下结果 ***ConnectedStreams<Tuple2<Integer,String>,Tuple3<Integer,String,Integer>> connectKey = connect.keyBy(v -> v.f0, v1 -> v1.f0);
connectKey.process(newCoProcessFunction<Tuple2<Integer,String>,Tuple3<Integer,String,Integer>,String>(){@OverridepublicvoidprocessElement1(Tuple2<Integer,String> value,CoProcessFunction<Tuple2<Integer,String>,Tuple3<Integer,String,Integer>,String>.Context ctx,Collector<String> out)throwsException{Integer id = value.f0;if(!s1Cache.containsKey(id)){List<Tuple2<Integer,String>> s1Values =newArrayList<>();
s1Values.add(value);
s1Cache.put(id, s1Values);}else{
s1Cache.get(id).add(value);}if(s2Cache.containsKey(id)){for(Tuple3<Integer,String,Integer> s2Element : s2Cache.get(id)){
out.collect("s1:"+ value +"<--------->s2:"+ s2Element);}}}@Overridepublicvoid processElement2
(Tuple3<Integer,String,Integer> value,CoProcessFunction<Tuple2<Integer,String>,Tuple3<Integer,String,Integer>,String>.Context ctx,Collector<String> out)throwsException{Integer id = value.f0;if(!s2Cache.containsKey(id)){List<Tuple3<Integer,String,Integer>> s2Values =newArrayList<>();
s2Values.add(value);
s2Cache.put(id, s2Values);}else{
s2Cache.get(id).add(value);}if(s1Cache.containsKey(id)){for(Tuple2<Integer,String> s1Element : s1Cache.get(id)){
out.collect("s1:"+ s1Element +"<--------->s2:"+ value);}}}}).print();
env.execute();
十一、输出算子sink
11.1连接到外部系统
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink方法实现的。
Flink1.12以后,同样重构了Sink架构,使用.sinkTo() 方法实现。
11.2输出到文件
FlinkSink支持行编码Row-encoded 和批量编码Bulk-encoded 格式。
这两种不同的方式都有各自的构建起builder,可以直接调用FileSink 的静态方法:
- 行编码:FileSink.forRowFormat(basePath,rowEncoder)
- 批量编码:FileSink.forBulkFormat(basePath,bulkWriterFactory)
代码:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有并行度个数的文件在写入
env.setParallelism(2);// 【必须开启】 checkpoint,否则一直都是 .inprogress
env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);// 数据生成器DataGeneratorSource<String> dataGeneratorSource =newDataGeneratorSource<>(newGeneratorFunction<Long,String>(){@OverridepublicStringmap(Long value)throwsException{return"Number:"+ value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING
);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource,WatermarkStrategy.noWatermarks(),"data-generator");// 输出到文件系统FileSink<String> fieSink =FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(newPath("d:/tmp"),newSimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("li-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(newDateTimeBucketAssigner<>("yyyy-MM-dd HH",ZoneId.systemDefault()))// 文件滚动策略: 1 分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(newMemorySize(1024*1024)).build()).build();
dataGen.sinkTo(fieSink);
env.execute();
11.3 输出到kafka
添加kafka连接器依赖
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 如果是【精准一次,必须开启】 checkpoint
env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33",7777);/*
Kafka Sink:
注意:如果要使用 【精准一次】 写入 Kafka,需要满足以下条件,缺一不可
1、开启 checkpoint
2、设置事务前缀
3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的 15分钟
*/KafkaSink<String> kafkaSink =KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("124.222.253.33:9092")// 指定序列化器:指定 Topic 名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(newSimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("li-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15 分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"").build();
sensorDS.sinkTo(kafkaSink);
env.execute();
自定义序列化器,实现带 key 的 record:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(2000,CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("124.222.253.33",7777);/*
如果要指定写入 kafka 的 key,可以自定义序列化器:
1、实现 一个接口,重写 序列化 方法
2、指定 key,转成 字节数组
3、指定 value,转成 字节数组
4、返回一个 ProducerRecord 对象,把 key、value 放进去
*/KafkaSink<String> kafkaSink =KafkaSink.<String>builder().setBootstrapServers("124.222.253.33:9092").setRecordSerializer(newKafkaRecordSerializationSchema<String>(){@OverridepublicProducerRecord<byte[],byte[]>serialize(String element,KafkaSinkContext context,Long timestamp){String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);returnnewProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("li-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"").build();
sensorDS.sinkTo(kafkaSink);
env.execute();
11.4 输出到MySql(JDBC)
添加mysql驱动和jdbc连接器的依赖
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency>
官方案例:
publicclassJdbcSinkExample{staticclassBook{publicBook(Long id,String title,String authors,Integer year){this.id = id;this.title = title;this.authors = authors;this.year = year;}finalLong id;finalString title;finalString authors;finalInteger year;}publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();/*
写入 mysql
1、只能用老的 sink 写法: addSink
2、JDBCSink 的 4 个参数:
第一个参数: 执行的 sql,一般就是 insert into
第二个参数: 预编译 sql, 对占位符填充值
第三个参数: 执行选项 ---》 攒批、重试
第四个参数: 连接选项 ---》 url、用户名、密码
*/
env.fromElements(newBook(101L,"Stream Processing with Apache Flink","Fabian Hueske, Vasiliki Kalavri",2019),newBook(102L,"Streaming Systems","Tyler Akidau, Slava Chernyak, Reuven Lax",2018),newBook(103L,"Designing Data-Intensive Applications","Martin Kleppmann",2017),newBook(104L,"Kafka: The Definitive Guide","Gwen Shapira, Neha Narkhede, Todd Palino",2017)).addSink(JdbcSink.sink("insert into books (id, title, authors, year) values (?, ?, ?, ?)",(statement, book)->{
statement.setLong(1, book.id);
statement.setString(2, book.title);
statement.setString(3, book.authors);
statement.setInt(4, book.year);},JdbcExecutionOptions.builder().withBatchSize(1000)// 批次的大小:条数.withBatchIntervalMs(200)// 批次的时间.withMaxRetries(5)// 重试次数.build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF8").withUsername("root").withPassword("root").withConnectionCheckTimeoutSeconds(60)// 重试的超时时间.build()));
env.execute();}}
11.5 自定义Sink输出
实现RichSinkDunction抽象类,自定义逻辑比较麻烦,不建议。
版权归原作者 LC超人在良家 所有, 如有侵权,请联系我们删除。