文章目录
Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。
所以下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。
DataSet 批处理算子
一、Source算子
1. fromCollection
fromCollection:从本地集合读取数据
例:
publicstaticvoidmain(String[] args){StreamExecutionEnvironment streamEnv =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stringDataStreamSource = streamEnv.fromCollection(Arrays.asList("1,张三","2,李四","3,王五","4,赵六"));}
2. readTextFile
readTextFile:从文件中读取
StreamExecutionEnvironment streamEnv =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stringDataStreamSource = streamEnv.readTextFile("/data/a.txt");
3. readTextFile:读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
压缩方法文件扩展名是否可并行读取DEFLATE.deflatenoGZip.gz .gzipnoBzip2.bz2noXZ.xzno
StreamExecutionEnvironment streamEnv =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stringDataStreamSource = streamEnv.readTextFile("/data/file.gz");
二、Transform转换算子
因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此:
publicstaticvoidmain(String[] args){StreamExecutionEnvironment streamEnv =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> stringDataStreamSource = streamEnv.fromCollection(Arrays.asList("1,张三","2,李四","3,王五","4,赵六"));}
1: map
将DataSet中的每一个元素转换为另外一个元素
importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;publicclassMapOperatorExample{publicstaticvoidmain(String[] args)throwsException{// 获取 ExecutionEnvironment,用于创建数据集ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 假设已经有一个包含整数的数据集DataSet<Integer> inputDataSet = env.fromElements(1,2,3,4,5);// 使用 Map 算子对数据集进行转换,将每个整数加上 10,并输出结果DataSet<Integer> outputDataSet = inputDataSet.map(newMapFunction<Integer,Integer>(){@OverridepublicIntegermap(Integer value)throwsException{// 对每个整数都加上 10,并将结果作为新的数据集return value +10;}});// 输出转换后的数据集
outputDataSet.print();}}
2:flatMap
- 将DataSet中的每一个元素转换为0…n个元素。
- FlatMap 算子是 Flink 中的一种数据转换算子,它将输入的每个元素通过用户自定义的函数进行处理,并生成零个、一个或多个新的元素。FlatMap 算子的底层逻辑是对数据集中的每个元素应用用户定义的函数,并将函数返回的多个元素平铺成新的数据集
importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.util.Collector;publicclassFlatMapOperatorExample{publicstaticvoidmain(String[] args)throwsException{// 获取 ExecutionEnvironment,用于创建数据集ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 假设已经有一个包含文本行的数据集DataSet<String> inputDataSet = env.fromElements("Flink is a powerful framework for stream and batch processing","It provides support for event time processing");// 使用 FlatMap 算子对数据集进行拆分并生成单词列表DataSet<String> wordDataSet = inputDataSet.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String value,Collector<String> out)throwsException{// 按空格拆分文本行,并将拆分后的单词逐个添加到输出集合String[] words = value.split(" ");for(String word : words){
out.collect(word);}}});// 输出单词列表
wordDataSet.print();}}
3:Filter 算子
- Filter 算子是 Flink 中的一种数据转换算子,它通过用户自定义的条件函数对数据集中的每个元素进行过滤,只保留满足条件的元素。
- Filter 算子的底层逻辑是对数据集中的每个元素应用用户定义的条件函数,只保留函数返回值为 true 的元素,过滤掉返回值为 false 的元素。
importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;publicclassFilterOperatorExample{publicstaticvoidmain(String[] args)throwsException{// 获取 ExecutionEnvironment,用于创建数据集ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 假设已经有一个包含整数的数据集DataSet<Integer> inputDataSet = env.fromElements(1,2,3,4,5,6,7,8,9,10);// 使用 Filter 算子对数据集进行过滤,只保留偶数DataSet<Integer> evenDataSet = inputDataSet.filter(newFilterFunction<Integer>(){@Overridepublicbooleanfilter(Integer value)throwsException{// 判断是否为偶数,保留返回 true 的元素return value %2==0;}});// 输出只包含偶数的数据集
evenDataSet.print();}}
4:Reduce 算子
- 可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素
- Reduce 算子是 Flink 中的一个基本聚合算子,用于对数据集中的元素进行二元聚合操作。
- Reduce 算子会将数据集中的元素两两配对,并使用用户提供的二元操作函数对配对的元素进行聚合,然后将聚合结果继续与下一个元素配对,直至处理完所有元素。最终,Reduce 算子会返回一个单一的结果值。
importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;publicclassReduceOperatorExample{publicstaticvoidmain(String[] args)throwsException{// 获取 ExecutionEnvironment,用于创建数据集ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 假设已经有一个包含整数的数据集DataSet<Integer> inputDataSet = env.fromElements(1,2,3,4,5);// 使用 Reduce 算子计算数据集中所有元素的总和DataSet<Integer> resultDataSet = inputDataSet.reduce(newReduceFunction<Integer>(){@OverridepublicIntegerreduce(Integer value1,Integer value2)throwsException{return value1 + value2;// 将两个元素相加得到新的结果}});// 输出计算结果
resultDataSet.print();}}
5:Aggregations
KeyedStream → DataStream
- Aggregations 算子是 Flink 中用于对数据集进行聚合操作的一组函数。它可以用于计算数据集中的最小值、最大值、求和、平均值等统计信息。Flink 提供了一系列内置的聚合函数,如 min、max、sum、avg 等。
//算平均值importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.aggregation.Aggregations;importorg.apache.flink.api.java.tuple.Tuple2;publicclassAggregationsOperatorExample{publicstaticvoidmain(String[] args)throwsException{// 获取 ExecutionEnvironment,用于创建数据集ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 假设已经有一个包含学生姓名和成绩的数据集DataSet<Tuple2<String,Double>> inputDataSet = env.fromElements(newTuple2<>("Alice",85.0),newTuple2<>("Bob",78.5),newTuple2<>("Cathy",92.5),newTuple2<>("David",65.0),newTuple2<>("Eva",88.5));// 使用 Aggregations 算子计算成绩的平均值double avgScore = inputDataSet.aggregate(Aggregations.SUM,1).div(inputDataSet.count());// 输出计算结果System.out.println("平均成绩:"+ avgScore);}}
主要的聚合方法:
- keyedStream.sum(0);
- keyedStream.sum(“key”);
- keyedStream.min(0);
- keyedStream.min(“key”);
- keyedStream.max(0);
- keyedStream.max(“key”);
- keyedStream.minBy(0);
- keyedStream.minBy(“key”);
- keyedStream.maxBy(0);
- keyedStream.maxBy(“key”);
6:Distinct 算子
- Distinct 算子是 Flink 中的一个转换算子,它用于去除输入流中重复的元素,并将去重后的结果作为输出流。Distinct 算子是在整个数据流上进行去重操作,不需要进行分组。
- 在底层,Distinct 算子通过维护一个状态来记录已经出现过的元素,当新的元素到达时,会与状态中的元素进行比较,如果状态中不存在该元素,则将其输出,并将其添加到状态中,以便后续去重。
Distinct 算子在很多场景下都很有用,例如:
- 数据去重:在流式计算中,经常会有重复的数据到达,而我们只关心每个数据的第一次出现,可以使用 Distinct 算子来去重。
- 实时数据摘要:有时候需要根据某个字段的特征选择每个分组的代表元素,例如选择每个用户的首次登录信息作为数据摘要。
- 数据清洗:在处理实时数据流时,可能会有一些无效或异常的数据需要清洗,Distinct 算子可以帮助去除重复的无效数据。
packagecom.wenge.datagroup.storage;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.operators.DistinctOperator;/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/publicclassTestFlink{publicstaticvoidmain(String[] args)throwsException{ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 模拟输入流,包含重复的元素DataSet<Integer> inputStream = env.fromElements(1,2,3,2,4,5,1,6,7,5);// 对输入流进行去重操作DistinctOperator<Integer> distinct = inputStream.distinct();
distinct.print();
env.execute("Distinct Example");}}
7:First 算子
First 算子是 Flink 中的一个转换算子,它用于从输入流中选择每个 Key 的第一个元素,并将其作为输出流中的结果。在流式计算中,经常需要根据某个特定的字段进行分组,并选择每个分组中的第一个元素,这时可以使用 First 算子来实现这个功能。First 算子是 KeyedStream 上的操作,所以在使用之前,需要先将数据流进行分组。
First 算子在许多场景下都很有用,例如:
- 数据去重:如果数据流中可能包含重复的元素,而我们只关心每个元素的第一次出现,可以使用 First 算子来去重。
- 实时数据摘要:在流式计算中,有时需要根据某个字段的特征选择每个分组的代表元素,例如选择每个用户的首次登录信息作为数据摘要。
- 时间窗口操作:在流式计算中,经常需要对窗口内的数据进行处理,而 First 算子可以用于选择每个窗口的起始元素。
packagecom.wenge.datagroup.storage;importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.operators.GroupReduceOperator;/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/publicclassTestFlink{publicstaticvoidmain(String[] args)throwsException{ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 模拟输入流,包含重复的元素DataSet<Integer> inputStream = env.fromElements(1,2,3,2,4,5,1,6,7,5);// 对输入流进行去重操作GroupReduceOperator<Integer,Integer> first = inputStream.first(2);//取前两个数
first.print();
env.execute("Distinct Example");}}
8:Join 算子
Join 算子是 Flink 中用于将两个数据集进行连接操作的一种算子。它通过指定连接的键(Key)将两个数据集中的元素按照某种条件进行关联,从而生成一个包含连接结果的新数据集。
应用场景
- Join 算子适用于需要将两个数据集进行关联的场景。常见的应用场景包括关联用户信息和订单信息、关联商品信息和销售信息等。
importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.api.java.tuple.Tuple3;publicclassJoinOperatorExample{publicstaticvoidmain(String[] args)throwsException{// 获取 ExecutionEnvironment,用于创建数据集ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 假设已经有包含订单信息的数据集DataSet<Tuple3<String,String,Integer>> orders = env.fromElements(newTuple3<>("Order001","Product001",2),newTuple3<>("Order002","Product002",1),newTuple3<>("Order003","Product001",3),newTuple3<>("Order004","Product003",5));// 假设已经有包含商品信息的数据集DataSet<Tuple2<String,String>> products = env.fromElements(newTuple2<>("Product001","Apple"),newTuple2<>("Product002","Banana"),newTuple2<>("Product003","Orange"));// 使用 Join 算子将订单信息和商品信息按照商品编号进行连接DataSet<Tuple3<String,String,Integer>> result = orders.join(products).where(newOrderProductJoinKeySelector()).equalTo(0).projectFirst(0,1).projectSecond(1);// 输出连接结果
result.print();}// 自定义 KeySelector,用于指定连接的键(商品编号)publicstaticclassOrderProductJoinKeySelectorimplementsKeySelector<Tuple3<String,String,Integer>,String>{@OverridepublicStringgetKey(Tuple3<String,String,Integer> value){return value.f1;// 商品编号是连接的键}}}
在上面的代码中,我们首先导入了 Flink 的相关类,然后创建了一个 ExecutionEnvironment 对象 env,用于创建数据集。接着,使用 env.fromElements() 方法创建了包含订单信息的数据集 orders 和包含商品信息的数据集 products。
然后,我们使用 join 方法对 orders 和 products 数据集进行连接。在 join 方法中,我们需要通过自定义 KeySelector 对象 OrderProductJoinKeySelector 指定连接的键(商品编号)。接着,我们通过 equalTo(0) 方法指定连接条件,表示连接键在 orders 数据集中的位置是 0,在 products 数据集中的位置也是 0。
最后,我们使用 projectFirst(0, 1) 方法和 projectSecond(1) 方法分别指定连接后要输出的字段,从而生成包含连接结果的新数据集 result。
最终,我们输出连接结果,即关联后的订单信息和商品名称。在本例中,输出结果为:
(Order001, Product001, 2) Apple
(Order002, Product002, 1) Banana
(Order003, Product001, 3) Apple
(Order004, Product003, 5) Orange
即订单信息和商品信息已按照商品编号进行连接,并输出了关联后的订单信息和商品名称。
9:Outer Join 算子
Outer Join 是 Flink 中用于进行外连接操作的算子。外连接是关系型数据库中的概念,在 Flink 中,它允许将两个数据流中的元素按照指定的键进行连接,并返回所有的元素,包括那些在其中一个数据流中存在而在另一个数据流中不存在的元素。外连接操作可以分为左外连接、右外连接和全外连接。
在底层,Outer Join 算子会维护一个状态来记录两个数据流中的匹配关系,并根据指定的键进行匹配。对于左外连接和右外连接,当某个数据流中的元素找不到匹配项时,会生成一个空值或者指定的默认值。对于全外连接,无论两个数据流中是否存在匹配项,都会输出所有的元素。
应用场景
- 外连接是一种常用的数据合并和关联操作,适用于以下场景:
- 合并数据:将两个数据流中的数据按照指定的键进行合并,可以用于数据的联合分析和展示。
- 补充缺失信息:在关联操作中,可能会有一些数据流中的元素在另一个数据流中找不到匹配项,使用外连接可以填充缺失信息。
- 数据清洗:有时候需要对两个数据流进行关联,去除不匹配的数据或者添加默认值。
importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassOuterJoinExample{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 模拟两个数据流DataStream<Tuple2<String,Integer>> stream1 = env.fromElements(newTuple2<>("A",1),newTuple2<>("B",2),newTuple2<>("C",3));DataStream<Tuple2<String,String>> stream2 = env.fromElements(newTuple2<>("A","X"),newTuple2<>("B","Y"),newTuple2<>("D","Z"));// 对两个数据流进行外连接操作,连接键为第一个元素DataStream<Tuple3<String,Integer,String>> result = stream1
.leftOuterJoin(stream2).where(tuple -> tuple.f0)// 第一个数据流的连接键.equalTo(tuple -> tuple.f0)// 第二个数据流的连接键.with((tuple1, tuple2)->{// 匹配成功的处理逻辑if(tuple2 ==null){// 若tuple2为空,表示匹配失败,使用默认值"UNKNOWN"returnnewTuple3<>(tuple1.f0, tuple1.f1,"UNKNOWN");}else{returnnewTuple3<>(tuple1.f0, tuple1.f1, tuple2.f1);}});
result.print();
env.execute("Outer Join Example");}}
在上面的示例中,我们使用 env.fromElements 方法创建了两个模拟数据流 stream1 和 stream2,分别包含不同的元素。然后,我们调用 leftOuterJoin 方法对这两个数据流进行外连接操作,连接键为第一个元素。在 with 方法中,我们定义了匹配成功的处理逻辑:当第二个数据流中找不到匹配项时,使用默认值"UNKNOWN"填充;否则,输出匹配成功的元素。在输出结果中,我们可以看到两个数据流中的元素都被连接在一起,并且在匹配失败的情况下填充了默认值"UNKNOWN"。
10:Cross 算子
- Cross 是 Flink 中的一个算子,用于将两个数据流中的所有元素进行两两组合,产生所有可能的组合结果。在底层,Cross 算子会维护两个数据流的状态,并对其中的每个元素进行遍历,将两个数据流的所有元素进行两两组合,并输出所有可能的组合结果。
- 交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集
- 和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作
应用场景
Cross 算子在实际应用中相对较少,因为它会产生较大的输出结果。但是在某些特定场景下,它仍然有一些用途,例如:
- 排列组合:在某些场景下,需要对两个数据流中的元素进行排列组合,生成所有可能的组合结果。
- 笛卡尔积:对于两个数据流之间的笛卡尔积操作,可以使用 Cross 算子进行实现。
importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassCrossExample{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 模拟两个数据流DataStream<Integer> stream1 = env.fromElements(1,2,3);DataStream<String> stream2 = env.fromElements("A","B");// 对两个数据流进行笛卡尔积操作DataStream<String> result = stream1.cross(stream2).with((num, str)-> num +"-"+ str);
result.print();
env.execute("Cross Example");}}
在上面的示例中,我们使用 env.fromElements 方法创建了两个模拟数据流 stream1 和 stream2,其中 stream1 包含整数 1、2 和 3,stream2 包含字符串 “A” 和 “B”。然后,我们调用 cross 方法对这两个数据流进行笛卡尔积操作,并在 with 方法中定义了组合结果的逻辑:将整数和字符串进行组合,用"-"分隔。在输出结果中,我们可以看到所有可能的组合结果,即整数和字符串之间的所有组合。请注意,Cross 算子会产生较大的输出结果,因此在实际应用中需要谨慎使用。
11:Union 算子
Union 算子是 Flink 中用于将多个数据集合并成一个新数据集的算子。它将多个数据集的元素合并在一起,形成一个新的数据集。
应用场景
- Union 算子适用于需要将多个数据集合并在一起的场景。例如,在流处理中,可能需要将多个数据流合并为一个数据流进行后续处理;在批处理中,可能需要将多个数据集合并在一起进行并行处理。
importorg.apache.flink.api.java.DataSet;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.tuple.Tuple2;publicclassUnionOperatorExample{publicstaticvoidmain(String[] args)throwsException{// 获取 ExecutionEnvironment,用于创建数据集ExecutionEnvironment env =ExecutionEnvironment.getExecutionEnvironment();// 假设已经有两个数据集,包含整数和字符串DataSet<Integer> dataSet1 = env.fromElements(1,2,3);DataSet<Integer> dataSet2 = env.fromElements(4,5,6);DataSet<String> dataSet3 = env.fromElements("A","B","C");// 使用 Union 算子将两个数据集合并成一个新数据集DataSet<Integer> mergedDataSet = dataSet1.union(dataSet2);// 使用 Union 算子将三个数据集合并成一个新数据集DataSet<Tuple2<Integer,String>> combinedDataSet = dataSet1.union(dataSet2).union(dataSet3);// 输出合并结果
mergedDataSet.print();
combinedDataSet.print();}}
三、Sink算子
1. collect
将数据输出到本地集合
importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;publicclassTransformationsFlatmap{publicstaticvoidmain(String[]arv)throwsException{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStream dsSocket=env.socketTextStream("192.168.23.210",9000);//函数式//输入spark,hive,hbaseDataStream ds1=dsSocket.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> collector)throwsException{String[] words=value.split(",");for(String word:words){
collector.collect(Tuple2.of(word,1));}}});
ds1.print();
env.execute("TransformationsMap");}}
2. writeAsText
将数据输出到文件
- Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等
- Flink支持多种文件的存储格式,包括text文件,CSV文件等
// 将数据写入本地文件
result.writeAsText("/data/a",WriteMode.OVERWRITE)// 将数据写入HDFS
result.writeAsText("hdfs://node01:9000/data/a",WriteMode.OVERWRITE)
DataStream流处理算子
和DataSet一样,DataStream也包括一系列的Transformation操作
一、Source算子
Flink可以使用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然我们也可以通过实现 SourceFunction 来自定义非并行的source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。
Flink在流处理上的source和在批处理上的source基本一致。大致有4大类:
- 基于本地集合的source(Collection-based-source)
- 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
- 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
- 自定义的source(Custom-source)
1:基于文件
- readTextFile(path) - 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。
- readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。
- readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*
*/publicclassSource_File{/**
* 一般用于学习测试 env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
*
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");DataStream<String> ds2 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/input/distribute_cache_student");DataStream<String> ds3 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.tar.gz");DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");// transformation// sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();// execute
env.execute();}}
2:基于套接字
socketTextStream - 从套接字读取。元素可以由分隔符分隔。
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/**
* @author alanchan
* 在server2上使用nc -lk 9999 向指定端口发送数据
* nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据
* 如果没有该命令可以下安装 yum install -y nc
*
*/publicclassSource_Socket{/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{//envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//sourceDataStream<String> lines = env.socketTextStream("server2",9999);//transformation/*SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split(" ");
for (String word : arr) {
out.collect(word);
}
}
});
words.map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value,1);
}
});*///注意:下面的操作将上面的2步合成了1步,直接切割单词并记为1返回// SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {// @Override// public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// String[] arr = value.split(" ");// for (String word : arr) {// out.collect(Tuple2.of(word, 1));// }// }// });//// SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(t -> t.f0).sum(1);//sink
lines.print();//execute
env.execute();}}
3:基于集合
- fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。
- fromCollection(Iterator, Class) - 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
- fromElements(T …) - 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
- fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。
- generateSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。
importjava.util.Arrays;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*
*/publicclassSource_Collection{/**
* 一般用于学习测试时编造数据时使用
* 1.env.fromElements(可变参数);
* 2.env.fromColletion(各种集合);
* 3.env.generateSequence(开始,结束);
* 4.env.fromSequence(开始,结束);
*
* @param args 基于集合
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds1 = env.fromElements("i am alanchan","i like flink");DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan","i like flink"));DataStream<Long> ds3 = env.generateSequence(1,10);//已过期,使用fromSequence方法DataStream<Long> ds4 = env.fromSequence(1,100);// transformation// sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();// execute
env.execute();}}
4:自定义
addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(…)) 来从 Apache Kafka 获取数据。
kafka
packagecom.wenge.datagroup.storage;importcom.wenge.datagroup.storage.common.ArgsConstants;importcom.wenge.datagroup.storage.utils.ConfigUtil;importcom.wenge.datagroup.storage.utils.Funnel;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importjava.util.Properties;/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/publicclassTestFlink{publicstaticvoidmain(String[] args){StreamExecutionEnvironment streamEnv =StreamExecutionEnvironment.getExecutionEnvironment();int sourceParallelism =1;String topic ="topic_name";Properties properties =getParameters();FlinkKafkaConsumer<String> consumer =newFlinkKafkaConsumer<>(
topic,newSimpleStringSchema(),
properties
);DataStreamSource<String> kafkaDataStreamSource = streamEnv.addSource(consumer);DataStream<String> dataStream = kafkaDataStreamSource.setParallelism(sourceParallelism).name("KafkaSource-"+ topic);
dataStream .print();// execute
streamEnv .execute();}privatestaticPropertiesgetParameters(){Properties properties =newProperties();// 集群地址
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.1:9092");// 消费者组id
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"wkl_test");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("partition.discovery.interval.ms","10000");//消费者定期发现动态创建的Kafka主题和分区的时间间隔// latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费// earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
properties.setProperty("auto.offset.reset","latest");// properties.setProperty("enable.auto.commit", "true");// properties.setProperty("auto.commit.interval.ms", "1000");//自动提交的时间间隔
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交的时间间隔
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"20000");//自动提交的时间间隔//每次从kafka中获取的数量
properties.setProperty("max.poll.records","2");return properties;}}
mysql
importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;importorg.source_transformation_sink.bean.User;/**
* @author alanchan
* 自定义数据源-MySQL
*/publicclassSource_MySQL{/**
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<User> studentDS = env.addSource(newMySQLSource()).setParallelism(1);// transformation// sink
studentDS.print();// execute
env.execute();}privatestaticclassMySQLSourceextendsRichParallelSourceFunction<User>{privateboolean flag =true;privateConnection conn =null;privatePreparedStatement ps =null;privateResultSet rs =null;// open只执行一次,适合开启资源@Overridepublicvoidopen(Configuration parameters)throwsException{
conn =DriverManager.getConnection("jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8","root","123456");String sql ="select id,name,pwd,email,age,balance from user";
ps = conn.prepareStatement(sql);}@Overridepublicvoidrun(SourceContext<User> ctx)throwsException{while(flag){
rs = ps.executeQuery();while(rs.next()){User user =newUser(
rs.getInt("id"),
rs.getString("name"),
rs.getString("pwd"),
rs.getString("email"),
rs.getInt("age"),
rs.getDouble("balance"));
ctx.collect(user);}Thread.sleep(5000);}}// 接收到cancel命令时取消数据生成@Overridepublicvoidcancel(){
flag =false;}// close里面关闭资源@Overridepublicvoidclose()throwsException{if(conn !=null)
conn.close();if(ps !=null)
ps.close();if(rs !=null)
rs.close();}}}importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/**
* @author alanchan
*
*/@Data@AllArgsConstructor@NoArgsConstructorpublicclassUser{privateint id;privateString name;privateString pwd;privateString email;privateint age;privatedouble balance;}
二、Transform转换算子
Flink算子,其实就是”数据转换算子“,对数据进行处理的方法或者程序封装就是算子
1: map
Map算子,就是映射算子,将一个数据映射为另一个数据,与Java8 stream 流式操作中的map一致
publicstaticvoidmain(String[] args)throwsException{List<String> stringList =Arrays.asList("a","b","c");StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1 (1个线程执行,以便观察)
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据源DataStreamSource<String> stream = env.fromCollection(stringList);// 使用map算子SingleOutputStreamOperator<String> source = stream.map(newMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{return value.toUpperCase()+"aa";}});// Aaa Baa Caa
source.print();
env.execute();}
2:FlatMap算子
FlatMap算子,可以将数据进行摊平化处理 例如 原本每一个元素都是集合或者数数组,我们使用FlatMap后,可以将(集合,数组)进行再次拆解取出其中的数据,再新组合为集合,与Java8 stream 流式操作中的Flatmap功能一致
publicstaticvoidmain(String[] args)throwsException{List<String> str1 =Arrays.asList("a","b","c");List<String> str2 =Arrays.asList("关羽","张飞","马超","黄忠","赵云");List<List<String>> originalData =newArrayList<>();
originalData.add(str1);
originalData.add(str2);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1 (1个线程执行,以便观察)
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据源DataStreamSource<List<String>> stream = env.fromCollection(originalData);// 使用flatMap算子SingleOutputStreamOperator<String> source = stream.flatMap(newFlatMapFunction<List<String>,String>(){@OverridepublicvoidflatMap(List<String> value,Collector<String> out)throwsException{for(String s : value){
out.collect(s);}}});// a b c 关羽 张飞 马超 黄忠 赵云
source.print();
env.execute();}
3:Filter算子
Filter为筛选(过滤)算子,可以根据条件过滤数据源中数据,例如现有数据源 1,2,3,4,5 现在要过滤大于3的数据,过滤后,数据源中仅有 4 5 数据了,与Java8 stream 流式操作中的filter功能一致
publicstaticvoidmain(String[] args)throwsException{List<Integer> str1 =Arrays.asList(1,2,3,4,5,6,7,8);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1 (1个线程执行,以便观察)
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据源DataStreamSource<Integer> stream = env.fromCollection(str1);// 使用filter算子SingleOutputStreamOperator<Integer> source = stream.filter(newFilterFunction<Integer>(){@Overridepublicbooleanfilter(Integer value)throwsException{return value >3;}});// 4 5 6 7 8
source.print();
env.execute();}
4:KeyBy算子
分组算子,根据数据源中元素某一特性进行分组,与Java8 stream 流式操作中的groupBy功能一致
publicstaticvoidmain(String[] args)throwsException{List<User> users =Arrays.asList(newUser("张三",12),newUser("张三",18),newUser("李四",22),newUser("麻子",35));StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1 (1个线程执行,以便观察)
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 加载数据源DataStreamSource<User> stream = env.fromCollection(users);// 使用filter算子KeyedStream<User,String> keyedStream = stream.keyBy(newKeySelector<User,String>(){@OverridepublicStringgetKey(User value)throwsException{return value.getName();}});
keyedStream.print();
env.execute();}
5:Union算子
union :联合算子, 使用此算子,可对多个数据源进行合并操作(数据源数据必须类型必须相同),其可合并多个,合并后可直接对数据进行处理 (计算或输出)
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);DataStreamSource<String> source = env.fromElements("zs","li","we");DataStreamSource<String> source2 = env.fromElements("zs2","li2","we2");DataStreamSource<String> source3 = env.fromElements("zs3","li3","we3");//此操作将 source 、source2 、source3 三个数据源的数据联合起来了DataStream<String> union = source.union(source2, source3);SingleOutputStreamOperator<String> streamOperator = union.map(newMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{return value.toUpperCase();}});
streamOperator.print("union").setParallelism(1);
env.execute();}
6:Connect算子
connect与union算子一样,都可以进行数据源合并处理,但与union不同的是,connect 可以合并不同类型的数据源,但最多只能合并两个数据流,且合并后无法直接操作(计算 输出),需要对连接流进行数据处理(选择最终合并后的数据类型,不符合最终数据类型的转换)
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);DataStreamSource<String> source = env.fromElements("zs","li","we");DataStreamSource<Integer> source2 = env.fromElements(1,2,3,4,5,6,7);ConnectedStreams<String,Integer> connect = source.connect(source2);// 我这里是将最终合并类型定为String.SingleOutputStreamOperator<String> streamOperator = connect.map(newCoMapFunction<String,Integer,String>(){@OverridepublicStringmap1(String value){return value +"是字符串类型,直接加后缀";}@OverridepublicStringmap2(Integer value){return"原本是Integer类型:"+ value +"现在也变为String";}});
streamOperator.print("connect");
env.execute();}
7:算子链式调用
publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);DataStreamSource<String> streamSource = env.socketTextStream("xx",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> result = streamSource.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String s,Collector<String> collector)throwsException{for(String s1 : s.split(",")){
collector.collect(s1);}}}).filter(s ->!s.equals("sb")).map(newMapFunction<String,String>(){@OverridepublicStringmap(String s)throwsException{return s.toUpperCase();}}).map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String s)throwsException{returnTuple2.of(s,1);}}).keyBy(tp -> tp.f0).reduce(newReduceFunction<Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>reduce(Tuple2<String,Integer> stringIntegerTuple2,Tuple2<String,Integer> t1)throwsException{returnTuple2.of(t1.f0, t1.f1 + stringIntegerTuple2.f1);}});
result.print();
env.execute();}
8:异步IO调用
packagecom.wenge.datagroup.storage.process;importcn.hutool.http.HttpRequest;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.apache.flink.api.common.functions.RichFilterFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.AsyncDataStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.functions.async.ResultFuture;importorg.apache.flink.streaming.api.functions.async.RichAsyncFunction;importjava.util.Collections;importjava.util.Objects;importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.function.Supplier;@Slf4jpublicclassAnalyerAsyncIOProcess{publicstaticDataStream<JSONObject>process(DataStream<String> dataStream){//线程数int asyncNum =10;//过滤不需要的数据SingleOutputStreamOperator<JSONObject> filterDataStream = dataStream.filter(newRichFilterFunction<String>(){@Overridepublicbooleanfilter(String str){//TODO:根据业务逻辑进行判断JSONObject data =null;try{
data =JSON.parseObject(str);String id = data.getString("id");if(Objects.isNull(id)){returnfalse;}else{returntrue;}}catch(Exception e){returnfalse;}}}).name("aysncIOFilter").setParallelism(1).map(newRichMapFunction<String,JSONObject>(){@OverridepublicJSONObjectmap(String str)throwsException{JSONObject data =null;try{
data =JSON.parseObject(str);return data;}catch(Exception e){
log.error("数据处理异常:{}", str);returnnull;}}}).name("aysncIOMAP").setParallelism(2);// 异步IORichAsyncFunction richAsyncFunction =newRichAsyncFunction<JSONObject,JSONObject>(){privatetransientExecutorService executorService;@Overridepublicvoidopen(Configuration parameters){// 加载配置文件,每一个并行度只执行一次
log.error("加载配置文件base");this.executorService =Executors.newFixedThreadPool(asyncNum);}@Overridepublicvoidclose()throwsException{// 关闭线程池if(executorService !=null){
executorService.shutdown();}
log.error("----------------------------线程池关闭----------------------");}@Overridepublicvoidtimeout(JSONObject input,ResultFuture<JSONObject> resultFuture){
log.error("------------------------数据超时----------------------:{}", input);JSONObject data = input;//对超时数据进行处理
resultFuture.complete(Collections.singleton(data));}@OverridepublicvoidasyncInvoke(JSONObject json,ResultFuture<JSONObject> resultFuture){CompletableFuture.supplyAsync(newSupplier<JSONObject>(){@OverridepublicJSONObjectget(){//识别语种String postResult =newString();String id = json.getString("id");long start =System.currentTimeMillis();try{//TODO: 根据业务逻辑进行处理
log.error("异步处理数据base:{}", json.getString("id"));
postResult =HttpRequest.post("http://127.0.0.1:8080").body(newJSONObject().toJSONString()).execute().body();if(StringUtils.isNotBlank(postResult)){
json.put("postResult", postResult);}long end =System.currentTimeMillis();
log.error("id:{},请求接口:{},耗时:{} ms", id, postResult,(end - start));return json;}catch(Exception e){
log.error("----------语种识别异步IO处理异常:{},数据:{}", id, e);return json;}}}, executorService).thenAccept((JSONObject dbResult)->{
resultFuture.complete(Collections.singleton(dbResult));});}};DataStream<JSONObject> downloadStream =AsyncDataStream.unorderedWait(
filterDataStream,
richAsyncFunction,5000,TimeUnit.MILLISECONDS,
asyncNum).name("IO").setParallelism(2);return downloadStream;}}
9:异步IO调用 sql
packagecom.wenge.datagroup.storage.process;importcom.alibaba.druid.pool.DruidDataSource;importcom.alibaba.fastjson.JSONObject;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.AsyncDataStream;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.functions.async.ResultFuture;importorg.apache.flink.streaming.api.functions.async.RichAsyncFunction;importjavax.sql.DataSource;importjava.sql.Connection;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.util.Collections;importjava.util.concurrent.CompletableFuture;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.TimeUnit;importjava.util.function.Supplier;@Slf4jpublicclassAnalyerAsyncIOSQL1{publicstaticDataStream<JSONObject>process(DataStream<JSONObject> dataStream){//sql 连接数int dataAsyncNum =10;//并行度int asyncNum =10;String databaseUrl ="jdbc:mysql://127.0.0.1:3306/test_data?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8";String databaseUsername ="root";String databasePassword ="123456";// 异步IORichAsyncFunction richAsyncFunction =newRichAsyncFunction<JSONObject,JSONObject>(){privatetransientExecutorService executorService;privateDruidDataSource dataSource;@Overridepublicvoidopen(Configuration parameters){// 重新加载配置文件
log.error("重新加载配置文件-SQL");this.executorService =Executors.newFixedThreadPool(dataAsyncNum);
dataSource =newDruidDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl(databaseUrl);
dataSource.setUsername(databaseUsername);
dataSource.setPassword(databasePassword);
dataSource.setMaxActive(dataAsyncNum);}@Overridepublicvoidclose()throwsException{// 关闭线程池
executorService.shutdown();
dataSource.close();
log.error("----------------------------线程池关闭----------------------");}@Overridepublicvoidtimeout(JSONObject input,ResultFuture<JSONObject> resultFuture){
log.error("------------------------数据超时----------------------:{}", input);JSONObject data = input;//对超时数据进行处理
resultFuture.complete(Collections.singleton(data));}@OverridepublicvoidasyncInvoke(JSONObject json,ResultFuture<JSONObject> resultFuture){CompletableFuture.supplyAsync(newSupplier<JSONObject>(){@OverridepublicJSONObjectget(){String id = json.getString("id");long start =System.currentTimeMillis();try{//TODO: 根据业务逻辑进行处理String country_id =newAnalyerAsyncIOSQL1().queryFromMySql("name", dataSource);
log.error("----------SQL,id:{},数据:{}", id, country_id);if(StringUtils.isNotBlank(country_id)){
json.put("country_id",country_id);}long end =System.currentTimeMillis();
log.error("id:{},sql,耗时:{} ms", id,(end-start));return json;}catch(Exception e){
log.error("----------SQL异步IO处理异常:{},数据:{}", id, e);return json;}}}, executorService).thenAccept((JSONObject dbResult)->{
resultFuture.complete(Collections.singleton(dbResult));});}};DataStream<JSONObject> downloadStream =AsyncDataStream.unorderedWait(
dataStream,
richAsyncFunction,5000,TimeUnit.MILLISECONDS,
dataAsyncNum).name("IO_SQL").setParallelism(asyncNum);return downloadStream;}/**
* SQL 查询代码实现
*/publicStringqueryFromMySql(String name,DataSource dataSource)throwsSQLException{String sql ="select id,name,country_id ,status from nation_info where status =1 and name = ?";String result =null;Connection connection =null;PreparedStatement stmt =null;ResultSet rs =null;try{
connection = dataSource.getConnection();
stmt = connection.prepareStatement(sql);
stmt.setString(1, name);
rs = stmt.executeQuery();while(rs.next()){
result = rs.getString("country_id");}}finally{if(rs !=null){
rs.close();}if(stmt !=null){
stmt.close();}if(connection !=null){
connection.close();}}return result;}}
三、Data Sinks
Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:
- writeAsText() / TextOutputFormat - 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。
- writeAsCsv(…) / CsvOutputFormat - 将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。
- print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。
- writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。
- writeToSocket - 根据 SerializationSchema 将元素写入套接字。 addSink - 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。
注意,DataStream 的 write*() 方法主要用于调试目的。它们不参与 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。
为了将流可靠地、精准一次地传输到文件系统中,请使用 FileSink。此外,通过 .addSink(…) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。
kafka
importorg.apache.kafka.common.serialization.ByteArrayDeserializer;importjava.util.Properties;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importorg.apache.flink.streaming.util.serialization.SimpleStringSchema;/**
* @author alanchan
*
*/publicclassSinkKafka{publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// source// 准备kafka连接参数Properties props =newProperties();// 集群地址
props.setProperty("bootstrap.servers","server1:9092");// 消费者组id
props.setProperty("group.id","flink");// latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费// earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
props.setProperty("auto.offset.reset","latest");// 会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测
props.setProperty("flink.partition-discovery.interval-millis","5000");// 自动提交
props.setProperty("enable.auto.commit","true");// 自动提交的时间间隔
props.setProperty("auto.commit.interval.ms","2000");// 使用连接参数创建FlinkKafkaConsumer/kafkaSourceFlinkKafkaConsumer<String> kafkaSource =newFlinkKafkaConsumer<String>("t_kafkasource",newSimpleStringSchema(),
props);// 使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);// transformation算子,业务计算//以alan作为结尾SingleOutputStreamOperator<String> etlDS = kafkaDS.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String value)throwsException{return value.contains("alan");}});// sink
etlDS.print();Properties props2 =newProperties();
props2.setProperty("bootstrap.servers","server1:9092");
props2.setProperty(ConsumerConfig.GROUP_ID_CONFIG, grouId);
props2.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props2.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props2.setProperty("enable.idempotence","false");FlinkKafkaProducer<String> kafkaSink =newFlinkKafkaProducer<>("send_topic",newSimpleStringSchema(),
props2);
etlDS.addSink(kafkaSink);// execute
env.execute();}}
flie
importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author alanchan
*/publicclassSinkDemo{publicstaticvoidmain(String[] args)throwsException{// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// sourceDataStream<String> ds = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");System.setProperty("HADOOP_USER_NAME","alanchan");// transformation// sink// ds.print();// ds.print("输出标识");// ds.printToErr();// 会在控制台上以红色输出// ds.printToErr("输出标识");// 会在控制台上以红色输出// 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件// ds.writeAsText("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/output/result1").setParallelism(1);
ds.writeAsText("hdfs://server2:8020///flinktest/words").setParallelism(2);// execute
env.execute();}}
mysql
importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.source_transformation_sink.bean.User;/**
* @author alanchan
*
*/publicclassSinkToMySQL{publicstaticvoidmain(String[] args)throwsException{// 0.envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 1.sourceDataStream<User> studentDS = env.fromElements(newUser(1,"alanchan","sink mysql","[email protected]",19,800));// 2.transformation// 3.sink
studentDS.addSink(newMySQLSink());// 4.execute
env.execute();}privatestaticclassMySQLSinkextendsRichSinkFunction<User>{privateConnection conn =null;privatePreparedStatement ps =null;@Overridepublicvoidopen(Configuration parameters)throwsException{
conn =DriverManager.getConnection("jdbc:mysql://server4:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false","root","123456");// private int id;// private String name;// private String pwd;// private String email;// private int age;// private double balance;String sql ="INSERT INTO `user` (`id`, `name`, `pwd`, `email`, `age`, `balance`) VALUES (null, ?, ?, ?, ?, ?);";
ps = conn.prepareStatement(sql);}@Overridepublicvoidinvoke(User value,Context context)throwsException{// 设置?占位符参数值
ps.setString(1, value.getName());
ps.setString(2, value.getPwd());
ps.setString(3, value.getEmail());
ps.setInt(4, value.getAge());
ps.setDouble(5, value.getBalance());// 执行sql
ps.executeUpdate();}@Overridepublicvoidclose()throwsException{if(conn !=null)
conn.close();if(ps !=null)
ps.close();}}}importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;/**
* @author alanchan
*
*/@Data@AllArgsConstructor@NoArgsConstructorpublicclassUser{privateint id;privateString name;privateString pwd;privateString email;privateint age;privatedouble balance;}
ES
packagecom.wenge.datagroup.storage;importcom.alibaba.fastjson.JSONObject;importcom.wenge.datagroup.storage.utils.*;importorg.apache.commons.lang3.StringUtils;importorg.apache.flink.api.common.RuntimeExecutionMode;importorg.apache.flink.api.common.functions.RuntimeContext;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;importorg.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;importorg.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;importorg.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;importorg.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importorg.apache.http.HttpHost;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.elasticsearch.action.update.UpdateRequest;importjava.net.MalformedURLException;importjava.net.URL;importjava.util.ArrayList;importjava.util.List;importjava.util.Properties;/**
* @author wangkanglu
* @version 1.0
* @description
* @date 2024-07-24 16:58
*/publicclassTestFlink{staticString topic ="test_topic";staticInteger bulkSize =100;staticInteger sinkParallelism =2;publicstaticvoidmain(String[] args){// envStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// source// 准备kafka连接参数Properties props =getParameters();// 使用连接参数创建FlinkKafkaConsumer/kafkaSourceFlinkKafkaConsumer<String> kafkaSource =newFlinkKafkaConsumer<String>(
topic,newSimpleStringSchema(),
props);// 使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);//更新进ESList<HttpHost> esAddresses =null;try{
esAddresses =getEsAddresses("192.168.1.1:9200,192.168.1.2:9200");}catch(MalformedURLException e){
log.error("解析ES地址报错", e);
e.printStackTrace();}ElasticsearchSinkFunction<JSONObject> elasticsearchSinkUpdateFunction =newElasticsearchSinkFunction<JSONObject>(){@Overridepublicvoidprocess(JSONObject jsonObject,RuntimeContext runtimeContext,RequestIndexer requestIndexer){//TODO:这里按照发布时间存到不同的索引,需要根据项目自行修改String index ="writeIndex";String uuid = jsonObject.getString("uuid");
jsonObject.remove("_index");
jsonObject.put("topic_flag",topic);
jsonObject.put("es_insert_time",DateUtils.getCurrentDateTime());if(StringUtils.isEmpty(index)){
log.error("UUID:{} index为空,不再更新ES", uuid);return;}else{
log.error("索引:{}中更新数据:UUID:{} ",index, uuid);}UpdateRequest updateRequest =newUpdateRequest(index, uuid).docAsUpsert(true).doc(jsonObject)//版本冲突中的重试次数.retryOnConflict(5);
requestIndexer.add(updateRequest);}};ESSinkUtil.addUpdateSink(esAddresses, bulkSize, sinkParallelism, kafkaDS, elasticsearchSinkUpdateFunction,"saveEs");}/**
* 解析配置文件的 es hosts
*
* @param hosts hosts字符串
* @throws MalformedURLException 地址异常
*/publicstaticList<HttpHost>getEsAddresses(String hosts)throwsMalformedURLException{String[] hostList = hosts.split(",");List<HttpHost> addresses =newArrayList<>();for(String host : hostList){if(host.startsWith("http")){URL url =newURL(host);
addresses.add(newHttpHost(url.getHost(), url.getPort()));}else{String[] parts = host.split(":",2);if(parts.length >1){
addresses.add(newHttpHost(parts[0],Integer.parseInt(parts[1])));}else{thrownewMalformedURLException("invalid elasticsearch hosts format");}}}return addresses;}/**
* es sink
*
* @param hosts es hosts
* @param bulkFlushMaxActions bulk flush size
* @param parallelism 并行数
* @param dataStream 数据
* @param func es写入方法
* @param <T> 泛型
* @param sinkName 算子名称
*/publicstatic<T>voidaddUpdateSink(List<HttpHost> hosts,int bulkFlushMaxActions,int parallelism,DataStream<T> dataStream,ElasticsearchSinkFunction<T> func,String sinkName){ElasticsearchSink.Builder<T> esSinkBuilder =newElasticsearchSink.Builder<>(hosts, func);
esSinkBuilder.setBulkFlushMaxActions(bulkFlushMaxActions);newRetryRequestFailureHandler();RestClientFactory restClientFactory =ESConstant.restClientFactory;//设置用户名密码
esSinkBuilder.setRestClientFactory(restClientFactory);
esSinkBuilder.setFailureHandler(newUpdateRetryRequestFailureHandler());//Bulk刷新间隔
esSinkBuilder.setBulkFlushInterval(1000);//重试次数
esSinkBuilder.setBulkFlushBackoffRetries(10);//重试间隔
esSinkBuilder.setBulkFlushBackoffDelay(5000);//重试类型
esSinkBuilder.setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
dataStream.addSink(esSinkBuilder.build()).name(sinkName).setParallelism(parallelism);}privatestaticPropertiesgetParameters(){Properties properties =newProperties();// 集群地址
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.1:9092");// 消费者组id
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"wkl_test");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("partition.discovery.interval.ms","10000");//消费者定期发现动态创建的Kafka主题和分区的时间间隔// latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费// earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费
properties.setProperty("auto.offset.reset","latest");// properties.setProperty("enable.auto.commit", "true");// properties.setProperty("auto.commit.interval.ms", "1000");//自动提交的时间间隔
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交的时间间隔
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"20000");//自动提交的时间间隔//每次从kafka中获取的数量
properties.setProperty("max.poll.records","2");return properties;}}
版权归原作者 苍煜 所有, 如有侵权,请联系我们删除。