Flink 学习三 Flink 流&process function API
1.Flink 多流操作
1.1.split 分流 (deprecated)
把一个数据流根据数据分成多个数据流 1.2 版本后移除
1.2.分流操作 (使用侧流输出)
publicclass _02_SplitStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1,2,3,4,5);SingleOutputStreamOperator<Integer> processed = streamSource.process(newProcessFunction<Integer,Integer>(){/**
*
* @param value 输出的数据
* @param ctx A 上下文
* @param out 主要流输出器
* @throws Exception
*/@OverridepublicvoidprocessElement(Integer value,ProcessFunction<Integer,Integer>.Context ctx,Collector<Integer> out)throwsException{if(value %3==0){//测流数据
ctx.output(newOutputTag<Integer>("3%0",TypeInformation.of(Integer.class)), value);}if(value %3==1){//测流数据
ctx.output(newOutputTag<Integer>("3%1",TypeInformation.of(Integer.class)), value);}//主流 ,数据
out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(newOutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(newOutputTag<>("3%1",TypeInformation.of(Integer.class)));
output1.print();
env.execute();}}
1.3.connect
connect 连接 DataStream ,DataStream ==> ConnectedStream
两个DataStream 连接成一个新的ConnectedStream ,虽然两个流连接在一起,但是两个流依然是相互独立的,这个方法的最大用处是: 两个流共享State 状态
两个流在内部还是各自处理各自的逻辑 比如 CoMapFunction 内的map1,map2 还是各自处理 streamSource,streamSource2;
数据类型可以不一致
publicclass _03_ConnectedStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1,2,3,4,5);DataStreamSource<Integer> streamSource2 = env.fromElements(10,20,30,40,50);ConnectedStreams<Integer,Integer> connected = streamSource.connect(streamSource2);// 原来的 MapFunction ==> CoMapFunction ; flatMap ==> CoMapFunctionSingleOutputStreamOperator<Object> mapped = connected.map(newCoMapFunction<Integer,Integer,Object>(){@OverridepublicObjectmap1(Integer value)throwsException{return value +1;}@OverridepublicObjectmap2(Integer value)throwsException{return value *10;}});
mapped.print();
env.execute();}}----------------------------------------------------------------------------------------------------------
streamSource ---> map1
--------------------------------------------------------------------------------
streamSource2 ---> map2
----------------------------------------------------------------------------------------------------------
1.4.union
可以合并多个流,流数据类型必须一致,
publicclass _04_UnionStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1,2,3,4,5);DataStreamSource<Integer> streamSource2 = env.fromElements(10,20,30,40,50,80,1110);DataStream<Integer> unioned = streamSource.union(streamSource2);SingleOutputStreamOperator<String> union = unioned.map(newMapFunction<Integer,String>(){@OverridepublicStringmap(Integer value)throwsException{return"union"+ value;}});
union.print();
env.execute();}}--------------------------------------------------------------------------------------
streamSource
----------------------------------------=====> map
----------------------------------------
streamSource2
--------------------------------------------------------------------------------------
1.5.coGroup
coGroup 本质上是join 算子的底层算子
有界流的思想去处理; 比如上说是时间窗口: 5S内数据分组匹配
<左边流>.coGroup(<右边流>).where(<KeySelector>).equalTo(<KeySelector>).window(<窗口>).apply(<处理逻辑>)
数据组比如说是时间窗口是5或者是10s 为一批数据, 时间窗口内的数据完成后,根据 where,和 equalTo 选择的key 数据一致 来分组
publicclass _05_CoGroupStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> name_idCard = env.socketTextStream("192.168.141.131",8888).map(x ->{Person person =newPerson();
person.setName(x.split(",")[0]);
person.setIdCard(x.split(",")[1]);return person;}).returns(TypeInformation.of(Person.class)).name("==idCard==");//name_idCard.print();DataStream<Person> name_addr = env.socketTextStream("192.168.141.131",7777).map(x ->{Person person =newPerson();
person.setName(x.split(",")[0]);
person.setAddr(x.split(",")[1]);return person;}).returns(TypeInformation.of(Person.class)).name("==addr==");//name_addr.print();DataStream<Person> dataStream = name_idCard.coGroup(name_addr)// 左边流的key.where(newKeySelector<Person,Object>(){@OverridepublicObjectgetKey(Person value)throwsException{return value.getName();}})// 右边流的key.equalTo(newKeySelector<Person,Object>(){@OverridepublicObjectgetKey(Person value)throwsException{return value.getName();}})//时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//处理逻辑 左边 Person ,右边 Person ,输出 Person.apply(newCoGroupFunction<Person,Person,Person>(){/**
* first 协调组第一个流个数据
* second 协调组第二个流数据
*/@OverridepublicvoidcoGroup(Iterable<Person> first,Iterable<Person> second,Collector<Person> out)throwsException{//左连接实现Iterator<Person> iterator = first.iterator();while(iterator.hasNext()){Person next1 = iterator.next();Iterator<Person> iterator1 = second.iterator();Boolean noDataFlag =true;while(iterator1.hasNext()){Person result =newPerson(next1);Person next = iterator1.next();
result.setAddr(next.getAddr());
out.collect(result);
noDataFlag =false;}if(noDataFlag){
out.collect(next1);}}}});
dataStream.print();
env.execute();}}
1.6. join 关联操作
用于关联两个流,需要指定join 条件;需要在窗口中进行关联后的计算逻辑
join 使用coGroup 实现的
publicclass _06_JoinStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//Perple 数据打平为Tuple name,idCard,addrDataStream<Tuple3<String,String,String>> name_idCard = env.socketTextStream("192.168.141.131",8888).map(x ->{returnTuple3.of(x.split(",")[0],x.split(",")[1],"");}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));DataStream<Tuple3<String,String,String>> name_addr = env.socketTextStream("192.168.141.131",7777).map(x ->{returnTuple3.of(x.split(",")[0],"",x.split(",")[1]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));//name_addr.print();DataStream<Tuple3<String,String,String>> dataStream = name_idCard.join(name_addr)// 左边流的f0 字段.where(tp3->tp3.f0)// 右边流的f0 字段.equalTo(tp3->tp3.f0)//时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))//处理逻辑 左边 Person ,右边 Person ,输出 Person.apply(newJoinFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple3<String,String,String>>(){/**
* @param first 匹配到的数据 first input.
* @param second 匹配到的数据 second input.
* @return
* @throws Exception
*/@OverridepublicTuple3join(Tuple3 first,Tuple3 second)throwsException{returnTuple3.of(first.f0,first.f1,second.f2);}});
dataStream.print();
env.execute();}}
1.7.broadcast
datastream1: 用户id|行为|操作数据 datastream2: 用户id|用户name|用户phone
windows time1 -------------------------------------------------------------------12|click| xxdssd 12|aa|13113|click| dasd 13|cc|133114|click| ad 14|dd|1321
windows time2 -------------------------------------------------------------------12|click| sfs
13|click| sdfs
15|click| ghf 17|dd|1321
windows time3 -------------------------------------------------------------------14|click| ghf
17|click| ghf
注: 左边流数据是基础数据,使用 join不合适 ,适合 broadcast
broadcast 适用于关联字典表
主流算子 <<<---------------------------------- 广播状态
publicclass _07_BroadcastStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStream<Tuple3<String,String,String>> operationInfo = env.socketTextStream("192.168.141.131",8888).map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// 数据打平为 用户id|用户name|用户phoneDataStream<Tuple3<String,String,String>> baseInfo = env.socketTextStream("192.168.141.131",7777).map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));//状态描述MapStateDescriptor<String,Tuple3<String,String,String>> userBaseInfoStateDesc =newMapStateDescriptor<>("user base info",TypeInformation.of(String.class),TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// 基础信息 变成广播流BroadcastStream<Tuple3<String,String,String>> userBaseInfoBroadcast = baseInfo
.broadcast(userBaseInfoStateDesc);// 关联行为流和广播流BroadcastConnectedStream<Tuple3<String,String,String>,Tuple3<String,String,String>> connected = operationInfo
.connect(userBaseInfoBroadcast);SingleOutputStreamOperator<Tuple5<String,String,String,String,String>> processed =// 连接后,处理的逻辑// connected 如果是keyedStream ===> 参数就是 KeyedBroadcastProcessFunction// connected 如果不是keyedStream ===> 参数就是 BroadcastProcessFunction
connected.process(newBroadcastProcessFunction<Tuple3<String,String,String>,// 左流的数据Tuple3<String,String,String>,// 广播的类型Tuple5<String,String,String,String,String>// 返回数据类型>(){/**
* 此方法是处理主流方法 主流来一条处理一下
*
* @throws Exception
*/@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,// 左流 主流 数据BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.ReadOnlyContext ctx,// 上下文Collector<Tuple5<String,String,String,String,String>> out // 输出器)throwsException{// 基础数据还没有 broadcastStateReadOnly// 和 processBroadcastElement 里面获取的 broadcastState 数据一致,只是是只读的// 数据是一致的ReadOnlyBroadcastState<String,Tuple3<String,String,String>> broadcastStateReadOnly = ctx
.getBroadcastState(userBaseInfoStateDesc);if(broadcastStateReadOnly ==null){
out.collect(Tuple5.of(value.f0, value.f1, value.f2,null,null));}else{Tuple3<String,String,String> baseInfo = broadcastStateReadOnly.get(value.f0);// 基础数据为空if(baseInfo ==null){
out.collect(Tuple5.of(value.f0, value.f1, value.f2,null,null));}else{
out.collect(Tuple5.of(value.f0, value.f1, value.f2, baseInfo.f1, baseInfo.f2));}}}/**
*
* 处理广播流数据:拿到数据后,存到状态里面
*/@OverridepublicvoidprocessBroadcastElement(Tuple3<String,String,String> value,// 广播流里面的一条数据BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.Context ctx,// 上下文Collector<Tuple5<String,String,String,String,String>> out // 输出器)throwsException{// 上下文 里面获取状态BroadcastState<String,Tuple3<String,String,String>> broadcastState = ctx
.getBroadcastState(userBaseInfoStateDesc);//状态里面 以用户id 作为key , 基础信息为value
broadcastState.put(value.f0, value);}});
processed.print();
env.execute();}}
2.Flink 编程 process function
2.1 process function 简介
process function相对于前面的map , flatmap ,filter 的区别就是,对数据的处理有更大的自由度; 可以获取到数据的上下文,数据处理逻辑 ,如何控制返回等交给编写者;
在事件驱动的应用中,使用最频繁的api 就是process function
注: 在对不同的流的时候, process function 的类型也不一致
数据流的转换
不同的DataStream 的process 处理方法需要的参数类型有如下几种
2.2 ProcessFunction
publicclass _01_ProcessFunction {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStreamSource<String> streamSource = env.fromElements("1,click,data1","2,click1,data2","10,flow,data1","22,doubleclick,data22");DataStream<Tuple3<String,String,String>> operationInfo = streamSource.map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// ProcessFunctionSingleOutputStreamOperator<String> processed = operationInfo
.process(newProcessFunction<Tuple3<String,String,String>,String>(){// 处理元素@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,ProcessFunction<Tuple3<String,String,String>,String>.Context ctx,Collector<String> out)throwsException{// 可以做主流输出
out.collect(value.f0 + value.f1 + value.f2);// 可以做侧流输出
ctx.output(newOutputTag<Tuple3<String,String,String>>("adasd",TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){})), value);}// 其余 声明周期方法 ... 任务状态 ... 都可以获取@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);}});
processed.print();
env.execute();}}
2.3 KeyedProcessFunction
publicclass _02_KeyedProcessFunction {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStreamSource<String> streamSource = env.fromElements("1,click,data1","2,click1,data2","10,flow,data1","22,doubleclick,data22","2,doubleclick,data22");DataStream<Tuple3<String,String,String>> operationInfo = streamSource.map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// keyedStreamKeyedStream<Tuple3<String,String,String>,String> keyedStream = operationInfo.keyBy(tp3 -> tp3.f0);// ProcessFunctionSingleOutputStreamOperator<String> processed = keyedStream
.process(newProcessFunction<Tuple3<String,String,String>,String>(){@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,ProcessFunction<Tuple3<String,String,String>,String>.Context ctx,Collector<String> out)throwsException{
out.collect((value.f0 + value.f1 + value.f2).toUpperCase(Locale.ROOT));}});
processed.print();
env.execute();}}
2.4 ProcessWindowFunction
2.5 ProcessAllWindowFunction
2.6 CoProcessFunction
2.7 ProcessJoinFunction
2.8 BroadcastProcessFunction
参考1.7
2.9 KeyedBroadcastProcessFunction
3.测试
packagedemo.sff.flink.exercise;importdemo.sff.flink.source.Person;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.state.BroadcastState;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.api.common.state.ReadOnlyBroadcastState;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.api.java.tuple.Tuple5;importorg.apache.flink.connector.file.sink.FileSink;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.jdbc.JdbcStatementBuilder;importorg.apache.flink.core.fs.Path;importorg.apache.flink.formats.parquet.ParquetWriterFactory;importorg.apache.flink.formats.parquet.avro.ParquetAvroWriters;importorg.apache.flink.streaming.api.datastream.*;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importorg.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;importorg.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;importorg.apache.flink.util.Collector;importorg.apache.flink.util.OutputTag;importjava.sql.PreparedStatement;importjava.sql.SQLException;importjava.util.Random;/**
* 创建流 Stream 1: id | event | count 1,event1,3 2,event1,5 3,event1,4
*
* Stream 2: id | gender | city 1 , male ,beijin 2 ,female,shanghai
*
* 需求 : 1.Stream 1 按照 count字段展开为对应的个数 比如id=1 展开为3条 1,event1,随机1 1,event1,随机2
* 1,event1,随机3 ,id=2 展开为5 条
*
* 2.Stream 1 关联上 Stream 2 数据
*
* 3.关联不上 测流 其余主流
*
* 4.主流,性别分组,取出最大随机数
*
* 5.主流写入mysql
*
* 6.测流写入parquet
*/publicclassTest1{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 创建流 后面可以使用socket 替换 stream2 先写入广播 不然关联不上DataStreamSource<String> stream1 = env.fromElements("1,event1,3","2,event1,5","3,event3,4");DataStreamSource<String> stream2 = env.fromElements("1,male,beijin"," 2,female,shanghai");DataStream<Tuple3<String,String,String>> streamOperator1 = stream1
.map(x ->Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2])).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));DataStream<Tuple3<String,String,String>> streamOperator2 = stream2
.map(x ->Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2])).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// 需求1DataStream<Tuple3<String,String,String>> mapDataStream = streamOperator1
.flatMap(newFlatMapFunction<Tuple3<String,String,String>,Tuple3<String,String,String>>(){@OverridepublicvoidflatMap(Tuple3<String,String,String> value,Collector<Tuple3<String,String,String>> out)throwsException{Integer integer =Integer.valueOf(value.f2);for(Integer i =0; i < integer; i++){int r =newRandom().nextInt(100);
out.collect(Tuple3.of(value.f0, value.f1, r +""));}}}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// mapDataStream.print();// 需求2 stream2 数据广播MapStateDescriptor<String,Tuple3<String,String,String>> descriptor =newMapStateDescriptor<String,Tuple3<String,String,String>>("userinfo",TypeInformation.of(String.class),TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));BroadcastStream<Tuple3<String,String,String>> tuple3BroadcastStream = streamOperator2.broadcast(descriptor);BroadcastConnectedStream<Tuple3<String,String,String>,Tuple3<String,String,String>> tuple3BroadcastConnectedStream = mapDataStream
.connect(tuple3BroadcastStream);SingleOutputStreamOperator<Tuple5<String,String,String,String,String>> processed = tuple3BroadcastConnectedStream
.process(newBroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>(){@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.ReadOnlyContext ctx,Collector<Tuple5<String,String,String,String,String>> out)throwsException{ReadOnlyBroadcastState<String,Tuple3<String,String,String>> broadcastState = ctx
.getBroadcastState(descriptor);// 需求3.关联不上 测流 其余主流if(broadcastState ==null){// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
ctx.output(newOutputTag<String>("nojoin",TypeInformation.of(String.class)),
value.f0 + value.f1 + value.f2);}else{Tuple3<String,String,String> stringTuple3 = broadcastState.get(value.f0);if(stringTuple3 ==null){// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
ctx.output(newOutputTag<String>("nojoin",TypeInformation.of(String.class)),
value.f0 + value.f1 + value.f2);}else{
out.collect(Tuple5.of(value.f0, value.f1, value.f2, stringTuple3.f1,
stringTuple3.f2));}}}@OverridepublicvoidprocessBroadcastElement(Tuple3<String,String,String> value,BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.Context ctx,Collector<Tuple5<String,String,String,String,String>> out)throwsException{BroadcastState<String,Tuple3<String,String,String>> broadcastState = ctx
.getBroadcastState(descriptor);
broadcastState.put(value.f0, value);}}).returns(TypeInformation.of(newTypeHint<Tuple5<String,String,String,String,String>>(){}));// 主流
processed.print();// 测流DataStream<String> sideOutput = processed
.getSideOutput(newOutputTag<String>("nojoin",TypeInformation.of(String.class)));// sideOutput.print();// 需求4 主流,性别分组,取出最大随机数SingleOutputStreamOperator<Tuple5<String,String,Integer,String,String>> streamOperator = processed
.keyBy(x -> x.f3).map(newMapFunction<Tuple5<String,String,String,String,String>,Tuple5<String,String,Integer,String,String>>(){@OverridepublicTuple5<String,String,Integer,String,String>map(Tuple5<String,String,String,String,String> value)throwsException{returnTuple5.of(value.f0, value.f1,Integer.valueOf(value.f2), value.f3, value.f4);}}).returns(TypeInformation.of(newTypeHint<Tuple5<String,String,Integer,String,String>>(){}));SingleOutputStreamOperator<Tuple5<String,String,Integer,String,String>> maxBy = streamOperator
.keyBy(tp5 -> tp5.f3).maxBy(2);
maxBy.print();// 5.主流写入mysql 未验证 待测试String sql =" insert into testa values (?,?,?,?,?) on duplicate key a=?,b=?,c=?,d=?,e=? ";SinkFunction<Tuple5<String,String,Integer,String,String>> jdbcSink =JdbcSink.sink(sql,newJdbcStatementBuilder<Tuple5<String,String,Integer,String,String>>(){@Overridepublicvoidaccept(PreparedStatement preparedStatement,Tuple5<String,String,Integer,String,String> tuple5)throwsSQLException{
preparedStatement.setString(0, tuple5.f0);
preparedStatement.setString(1, tuple5.f1);
preparedStatement.setInt(2, tuple5.f2);
preparedStatement.setString(3, tuple5.f3);
preparedStatement.setString(4, tuple5.f4);
preparedStatement.setString(5, tuple5.f0);
preparedStatement.setString(6, tuple5.f1);
preparedStatement.setInt(7, tuple5.f2);
preparedStatement.setString(8, tuple5.f3);
preparedStatement.setString(9, tuple5.f4);}},JdbcExecutionOptions.builder().withBatchSize(2)// 两条数据一批插入.withMaxRetries(3)// 失败插入重试次数.build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withPassword("root")// jdbc 连接信息.withUsername("root")// jdbc 连接信息.withUrl("jdbc:mysql://192.168.141.131:3306/flinkdemo").build());
streamOperator.addSink(jdbcSink);// 6.测流写入parquet 未验证 待测试ParquetWriterFactory<String> writerFactory =ParquetAvroWriters.forReflectRecord(String.class);FileSink<String> build =FileSink.forBulkFormat(newPath("d:/sink"), writerFactory).withBucketAssigner(newDateTimeBucketAssigner<String>())// 文件分桶策略.withBucketCheckInterval(5)// 文件夹异步线程创建和检测周期.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flinkdemo")// 文件前缀.withPartSuffix(".txt")// 文件后缀.build())// 文件的输出格式对象.build();
sideOutput.sinkTo(build);
env.execute();}}
版权归原作者 C0oOder 所有, 如有侵权,请联系我们删除。