0


Flink 学习三 Flink 流 & process function API

Flink 学习三 Flink 流&process function API

1.Flink 多流操作

1.1.split 分流 (deprecated)

把一个数据流根据数据分成多个数据流 1.2 版本后移除

1.2.分流操作 (使用侧流输出)

publicclass _02_SplitStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1,2,3,4,5);SingleOutputStreamOperator<Integer> processed = streamSource.process(newProcessFunction<Integer,Integer>(){/**
             *
             * @param value 输出的数据
             * @param ctx A 上下文
             * @param out 主要流输出器
             * @throws Exception
             */@OverridepublicvoidprocessElement(Integer value,ProcessFunction<Integer,Integer>.Context ctx,Collector<Integer> out)throwsException{if(value %3==0){//测流数据
                    ctx.output(newOutputTag<Integer>("3%0",TypeInformation.of(Integer.class)), value);}if(value %3==1){//测流数据
                    ctx.output(newOutputTag<Integer>("3%1",TypeInformation.of(Integer.class)), value);}//主流 ,数据
                out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(newOutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(newOutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();}}

1.3.connect

connect 连接 DataStream ,DataStream ==> ConnectedStream

两个DataStream 连接成一个新的ConnectedStream ,虽然两个流连接在一起,但是两个流依然是相互独立的,这个方法的最大用处是: 两个流共享State 状态

两个流在内部还是各自处理各自的逻辑 比如 CoMapFunction 内的map1,map2 还是各自处理 streamSource,streamSource2;

数据类型可以不一致

publicclass _03_ConnectedStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1,2,3,4,5);DataStreamSource<Integer> streamSource2 = env.fromElements(10,20,30,40,50);ConnectedStreams<Integer,Integer> connected = streamSource.connect(streamSource2);// 原来的 MapFunction ==>  CoMapFunction  ; flatMap ==> CoMapFunctionSingleOutputStreamOperator<Object> mapped = connected.map(newCoMapFunction<Integer,Integer,Object>(){@OverridepublicObjectmap1(Integer value)throwsException{return value +1;}@OverridepublicObjectmap2(Integer value)throwsException{return value *10;}});

        mapped.print();

        env.execute();}}----------------------------------------------------------------------------------------------------------    
    streamSource         --->         map1  
  --------------------------------------------------------------------------------    
    streamSource2       --->          map2  
  ----------------------------------------------------------------------------------------------------------

1.4.union

可以合并多个流,流数据类型必须一致,

publicclass _04_UnionStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1,2,3,4,5);DataStreamSource<Integer> streamSource2 = env.fromElements(10,20,30,40,50,80,1110);DataStream<Integer> unioned = streamSource.union(streamSource2);SingleOutputStreamOperator<String> union = unioned.map(newMapFunction<Integer,String>(){@OverridepublicStringmap(Integer value)throwsException{return"union"+ value;}});
        union.print();
        env.execute();}}--------------------------------------------------------------------------------------          
    streamSource               
  ----------------------------------------=====>        map
  ----------------------------------------    
    streamSource2               
  --------------------------------------------------------------------------------------

1.5.coGroup

coGroup 本质上是join 算子的底层算子

有界流的思想去处理; 比如上说是时间窗口: 5S内数据分组匹配

<左边流>.coGroup(<右边流>).where(<KeySelector>).equalTo(<KeySelector>).window(<窗口>).apply(<处理逻辑>)

在这里插入图片描述

数据组比如说是时间窗口是5或者是10s 为一批数据, 时间窗口内的数据完成后,根据 where,和 equalTo 选择的key 数据一致 来分组

publicclass _05_CoGroupStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> name_idCard = env.socketTextStream("192.168.141.131",8888).map(x ->{Person person =newPerson();
            person.setName(x.split(",")[0]);
            person.setIdCard(x.split(",")[1]);return person;}).returns(TypeInformation.of(Person.class)).name("==idCard==");//name_idCard.print();DataStream<Person> name_addr = env.socketTextStream("192.168.141.131",7777).map(x ->{Person person =newPerson();
            person.setName(x.split(",")[0]);
            person.setAddr(x.split(",")[1]);return person;}).returns(TypeInformation.of(Person.class)).name("==addr==");//name_addr.print();DataStream<Person> dataStream = name_idCard.coGroup(name_addr)// 左边流的key.where(newKeySelector<Person,Object>(){@OverridepublicObjectgetKey(Person value)throwsException{return value.getName();}})// 右边流的key.equalTo(newKeySelector<Person,Object>(){@OverridepublicObjectgetKey(Person value)throwsException{return value.getName();}})//时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))//处理逻辑  左边 Person ,右边  Person ,输出 Person.apply(newCoGroupFunction<Person,Person,Person>(){/**
                     * first 协调组第一个流个数据
                     * second 协调组第二个流数据
                     */@OverridepublicvoidcoGroup(Iterable<Person> first,Iterable<Person> second,Collector<Person> out)throwsException{//左连接实现Iterator<Person> iterator = first.iterator();while(iterator.hasNext()){Person next1 = iterator.next();Iterator<Person> iterator1 = second.iterator();Boolean noDataFlag =true;while(iterator1.hasNext()){Person result =newPerson(next1);Person next = iterator1.next();
                                result.setAddr(next.getAddr());
                                out.collect(result);
                                noDataFlag =false;}if(noDataFlag){
                                out.collect(next1);}}}});

        dataStream.print();

        env.execute();}}

1.6. join 关联操作

用于关联两个流,需要指定join 条件;需要在窗口中进行关联后的计算逻辑

join 使用coGroup 实现的

publicclass _06_JoinStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//Perple 数据打平为Tuple  name,idCard,addrDataStream<Tuple3<String,String,String>> name_idCard = env.socketTextStream("192.168.141.131",8888).map(x ->{returnTuple3.of(x.split(",")[0],x.split(",")[1],"");}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));DataStream<Tuple3<String,String,String>> name_addr = env.socketTextStream("192.168.141.131",7777).map(x ->{returnTuple3.of(x.split(",")[0],"",x.split(",")[1]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));//name_addr.print();DataStream<Tuple3<String,String,String>> dataStream = name_idCard.join(name_addr)// 左边流的f0 字段.where(tp3->tp3.f0)// 右边流的f0 字段.equalTo(tp3->tp3.f0)//时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))//处理逻辑  左边 Person ,右边  Person ,输出 Person.apply(newJoinFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple3<String,String,String>>(){/**
                     * @param first 匹配到的数据  first input.
                     * @param second 匹配到的数据 second input.
                     * @return
                     * @throws Exception
                     */@OverridepublicTuple3join(Tuple3 first,Tuple3 second)throwsException{returnTuple3.of(first.f0,first.f1,second.f2);}});

        dataStream.print();
        env.execute();}}

1.7.broadcast

   datastream1: 用户id|行为|操作数据                   datastream2: 用户id|用户name|用户phone   
windows time1 -------------------------------------------------------------------12|click| xxdssd                        12|aa|13113|click| dasd                          13|cc|133114|click| ad                            14|dd|1321    
windows time2 -------------------------------------------------------------------12|click| sfs                                                                      
                13|click| sdfs       
                15|click| ghf                         17|dd|1321                                            
windows time3 -------------------------------------------------------------------14|click| ghf   
                17|click| ghf                                                 
       
       注: 左边流数据是基础数据,使用 join不合适 ,适合 broadcast
           broadcast 适用于关联字典表 
  
       主流算子        <<<----------------------------------    广播状态                
       
       
publicclass _07_BroadcastStream {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStream<Tuple3<String,String,String>> operationInfo = env.socketTextStream("192.168.141.131",8888).map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// 数据打平为 用户id|用户name|用户phoneDataStream<Tuple3<String,String,String>> baseInfo = env.socketTextStream("192.168.141.131",7777).map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));//状态描述MapStateDescriptor<String,Tuple3<String,String,String>> userBaseInfoStateDesc =newMapStateDescriptor<>("user base info",TypeInformation.of(String.class),TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// 基础信息 变成广播流BroadcastStream<Tuple3<String,String,String>> userBaseInfoBroadcast = baseInfo
                .broadcast(userBaseInfoStateDesc);// 关联行为流和广播流BroadcastConnectedStream<Tuple3<String,String,String>,Tuple3<String,String,String>> connected = operationInfo
                .connect(userBaseInfoBroadcast);SingleOutputStreamOperator<Tuple5<String,String,String,String,String>> processed =// 连接后,处理的逻辑// connected 如果是keyedStream ===> 参数就是 KeyedBroadcastProcessFunction// connected 如果不是keyedStream ===> 参数就是 BroadcastProcessFunction
                connected.process(newBroadcastProcessFunction<Tuple3<String,String,String>,// 左流的数据Tuple3<String,String,String>,// 广播的类型Tuple5<String,String,String,String,String>// 返回数据类型>(){/**
                     * 此方法是处理主流方法 主流来一条处理一下
                     * 
                     * @throws Exception
                     */@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,// 左流 主流 数据BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.ReadOnlyContext ctx,// 上下文Collector<Tuple5<String,String,String,String,String>> out // 输出器)throwsException{// 基础数据还没有 broadcastStateReadOnly// 和 processBroadcastElement 里面获取的 broadcastState 数据一致,只是是只读的// 数据是一致的ReadOnlyBroadcastState<String,Tuple3<String,String,String>> broadcastStateReadOnly = ctx
                                .getBroadcastState(userBaseInfoStateDesc);if(broadcastStateReadOnly ==null){
                            out.collect(Tuple5.of(value.f0, value.f1, value.f2,null,null));}else{Tuple3<String,String,String> baseInfo = broadcastStateReadOnly.get(value.f0);// 基础数据为空if(baseInfo ==null){
                                out.collect(Tuple5.of(value.f0, value.f1, value.f2,null,null));}else{
                                out.collect(Tuple5.of(value.f0, value.f1, value.f2, baseInfo.f1, baseInfo.f2));}}}/**
                     *
                     * 处理广播流数据:拿到数据后,存到状态里面
                     */@OverridepublicvoidprocessBroadcastElement(Tuple3<String,String,String> value,// 广播流里面的一条数据BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.Context ctx,// 上下文Collector<Tuple5<String,String,String,String,String>> out // 输出器)throwsException{// 上下文 里面获取状态BroadcastState<String,Tuple3<String,String,String>> broadcastState = ctx
                                .getBroadcastState(userBaseInfoStateDesc);//状态里面 以用户id 作为key , 基础信息为value
                        broadcastState.put(value.f0, value);}});

        processed.print();

        env.execute();}}

2.Flink 编程 process function

2.1 process function 简介

process function相对于前面的map , flatmap ,filter 的区别就是,对数据的处理有更大的自由度; 可以获取到数据的上下文,数据处理逻辑 ,如何控制返回等交给编写者;

在事件驱动的应用中,使用最频繁的api 就是process function

注: 在对不同的流的时候, process function 的类型也不一致

数据流的转换

在这里插入图片描述

不同的DataStream 的process 处理方法需要的参数类型有如下几种

2.2 ProcessFunction

publicclass _01_ProcessFunction {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStreamSource<String> streamSource = env.fromElements("1,click,data1","2,click1,data2","10,flow,data1","22,doubleclick,data22");DataStream<Tuple3<String,String,String>> operationInfo = streamSource.map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// ProcessFunctionSingleOutputStreamOperator<String> processed = operationInfo
                .process(newProcessFunction<Tuple3<String,String,String>,String>(){// 处理元素@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,ProcessFunction<Tuple3<String,String,String>,String>.Context ctx,Collector<String> out)throwsException{// 可以做主流输出
                        out.collect(value.f0 + value.f1 + value.f2);// 可以做侧流输出
                        ctx.output(newOutputTag<Tuple3<String,String,String>>("adasd",TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){})), value);}// 其余 声明周期方法 ... 任务状态 ... 都可以获取@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);}});

        processed.print();

        env.execute();}}

2.3 KeyedProcessFunction

publicclass _02_KeyedProcessFunction {publicstaticvoidmain(String[] args)throwsException{// 获取环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 数据打平为 用户id|行为|操作数据DataStreamSource<String> streamSource = env.fromElements("1,click,data1","2,click1,data2","10,flow,data1","22,doubleclick,data22","2,doubleclick,data22");DataStream<Tuple3<String,String,String>> operationInfo = streamSource.map(x ->{returnTuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2]);}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// keyedStreamKeyedStream<Tuple3<String,String,String>,String> keyedStream = operationInfo.keyBy(tp3 -> tp3.f0);// ProcessFunctionSingleOutputStreamOperator<String> processed = keyedStream
                .process(newProcessFunction<Tuple3<String,String,String>,String>(){@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,ProcessFunction<Tuple3<String,String,String>,String>.Context ctx,Collector<String> out)throwsException{
                        out.collect((value.f0 + value.f1 + value.f2).toUpperCase(Locale.ROOT));}});

        processed.print();

        env.execute();}}

2.4 ProcessWindowFunction

2.5 ProcessAllWindowFunction

2.6 CoProcessFunction

2.7 ProcessJoinFunction

2.8 BroadcastProcessFunction

参考1.7

2.9 KeyedBroadcastProcessFunction

3.测试

packagedemo.sff.flink.exercise;importdemo.sff.flink.source.Person;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.state.BroadcastState;importorg.apache.flink.api.common.state.MapStateDescriptor;importorg.apache.flink.api.common.state.ReadOnlyBroadcastState;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.tuple.Tuple3;importorg.apache.flink.api.java.tuple.Tuple5;importorg.apache.flink.connector.file.sink.FileSink;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcExecutionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.jdbc.JdbcStatementBuilder;importorg.apache.flink.core.fs.Path;importorg.apache.flink.formats.parquet.ParquetWriterFactory;importorg.apache.flink.formats.parquet.avro.ParquetAvroWriters;importorg.apache.flink.streaming.api.datastream.*;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importorg.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;importorg.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;importorg.apache.flink.util.Collector;importorg.apache.flink.util.OutputTag;importjava.sql.PreparedStatement;importjava.sql.SQLException;importjava.util.Random;/**
 * 创建流 Stream 1: id | event | count 1,event1,3 2,event1,5 3,event1,4
 *
 * Stream 2: id | gender | city 1 , male ,beijin 2 ,female,shanghai
 *
 * 需求 : 1.Stream 1 按照 count字段展开为对应的个数 比如id=1 展开为3条 1,event1,随机1 1,event1,随机2
 * 1,event1,随机3 ,id=2 展开为5 条
 *
 * 2.Stream 1 关联上 Stream 2 数据
 *
 * 3.关联不上 测流 其余主流
 *
 * 4.主流,性别分组,取出最大随机数
 *
 * 5.主流写入mysql
 *
 * 6.测流写入parquet
 */publicclassTest1{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);// 创建流 后面可以使用socket 替换 stream2 先写入广播 不然关联不上DataStreamSource<String> stream1 = env.fromElements("1,event1,3","2,event1,5","3,event3,4");DataStreamSource<String> stream2 = env.fromElements("1,male,beijin"," 2,female,shanghai");DataStream<Tuple3<String,String,String>> streamOperator1 = stream1
                .map(x ->Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2])).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));DataStream<Tuple3<String,String,String>> streamOperator2 = stream2
                .map(x ->Tuple3.of(x.split(",")[0], x.split(",")[1], x.split(",")[2])).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// 需求1DataStream<Tuple3<String,String,String>> mapDataStream = streamOperator1
                .flatMap(newFlatMapFunction<Tuple3<String,String,String>,Tuple3<String,String,String>>(){@OverridepublicvoidflatMap(Tuple3<String,String,String> value,Collector<Tuple3<String,String,String>> out)throwsException{Integer integer =Integer.valueOf(value.f2);for(Integer i =0; i < integer; i++){int r =newRandom().nextInt(100);
                            out.collect(Tuple3.of(value.f0, value.f1, r +""));}}}).returns(TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));// mapDataStream.print();// 需求2 stream2 数据广播MapStateDescriptor<String,Tuple3<String,String,String>> descriptor =newMapStateDescriptor<String,Tuple3<String,String,String>>("userinfo",TypeInformation.of(String.class),TypeInformation.of(newTypeHint<Tuple3<String,String,String>>(){}));BroadcastStream<Tuple3<String,String,String>> tuple3BroadcastStream = streamOperator2.broadcast(descriptor);BroadcastConnectedStream<Tuple3<String,String,String>,Tuple3<String,String,String>> tuple3BroadcastConnectedStream = mapDataStream
                .connect(tuple3BroadcastStream);SingleOutputStreamOperator<Tuple5<String,String,String,String,String>> processed = tuple3BroadcastConnectedStream
                .process(newBroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>(){@OverridepublicvoidprocessElement(Tuple3<String,String,String> value,BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.ReadOnlyContext ctx,Collector<Tuple5<String,String,String,String,String>> out)throwsException{ReadOnlyBroadcastState<String,Tuple3<String,String,String>> broadcastState = ctx
                                        .getBroadcastState(descriptor);// 需求3.关联不上 测流 其余主流if(broadcastState ==null){// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
                                    ctx.output(newOutputTag<String>("nojoin",TypeInformation.of(String.class)),
                                            value.f0 + value.f1 + value.f2);}else{Tuple3<String,String,String> stringTuple3 = broadcastState.get(value.f0);if(stringTuple3 ==null){// out.collect(Tuple5.of(value.f0, value.f1, value.f2, null, null));
                                        ctx.output(newOutputTag<String>("nojoin",TypeInformation.of(String.class)),
                                                value.f0 + value.f1 + value.f2);}else{
                                        out.collect(Tuple5.of(value.f0, value.f1, value.f2, stringTuple3.f1,
                                                stringTuple3.f2));}}}@OverridepublicvoidprocessBroadcastElement(Tuple3<String,String,String> value,BroadcastProcessFunction<Tuple3<String,String,String>,Tuple3<String,String,String>,Tuple5<String,String,String,String,String>>.Context ctx,Collector<Tuple5<String,String,String,String,String>> out)throwsException{BroadcastState<String,Tuple3<String,String,String>> broadcastState = ctx
                                        .getBroadcastState(descriptor);
                                broadcastState.put(value.f0, value);}}).returns(TypeInformation.of(newTypeHint<Tuple5<String,String,String,String,String>>(){}));// 主流
        processed.print();// 测流DataStream<String> sideOutput = processed
                .getSideOutput(newOutputTag<String>("nojoin",TypeInformation.of(String.class)));// sideOutput.print();// 需求4 主流,性别分组,取出最大随机数SingleOutputStreamOperator<Tuple5<String,String,Integer,String,String>> streamOperator = processed
                .keyBy(x -> x.f3).map(newMapFunction<Tuple5<String,String,String,String,String>,Tuple5<String,String,Integer,String,String>>(){@OverridepublicTuple5<String,String,Integer,String,String>map(Tuple5<String,String,String,String,String> value)throwsException{returnTuple5.of(value.f0, value.f1,Integer.valueOf(value.f2), value.f3, value.f4);}}).returns(TypeInformation.of(newTypeHint<Tuple5<String,String,Integer,String,String>>(){}));SingleOutputStreamOperator<Tuple5<String,String,Integer,String,String>> maxBy = streamOperator
                .keyBy(tp5 -> tp5.f3).maxBy(2);
        maxBy.print();// 5.主流写入mysql  未验证 待测试String sql =" insert into testa values (?,?,?,?,?) on duplicate key a=?,b=?,c=?,d=?,e=?  ";SinkFunction<Tuple5<String,String,Integer,String,String>> jdbcSink =JdbcSink.sink(sql,newJdbcStatementBuilder<Tuple5<String,String,Integer,String,String>>(){@Overridepublicvoidaccept(PreparedStatement preparedStatement,Tuple5<String,String,Integer,String,String> tuple5)throwsSQLException{
                        preparedStatement.setString(0, tuple5.f0);
                        preparedStatement.setString(1, tuple5.f1);
                        preparedStatement.setInt(2, tuple5.f2);
                        preparedStatement.setString(3, tuple5.f3);
                        preparedStatement.setString(4, tuple5.f4);
                        preparedStatement.setString(5, tuple5.f0);
                        preparedStatement.setString(6, tuple5.f1);
                        preparedStatement.setInt(7, tuple5.f2);
                        preparedStatement.setString(8, tuple5.f3);
                        preparedStatement.setString(9, tuple5.f4);}},JdbcExecutionOptions.builder().withBatchSize(2)// 两条数据一批插入.withMaxRetries(3)// 失败插入重试次数.build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withPassword("root")// jdbc 连接信息.withUsername("root")// jdbc 连接信息.withUrl("jdbc:mysql://192.168.141.131:3306/flinkdemo").build());
        streamOperator.addSink(jdbcSink);// 6.测流写入parquet  未验证 待测试ParquetWriterFactory<String> writerFactory =ParquetAvroWriters.forReflectRecord(String.class);FileSink<String> build =FileSink.forBulkFormat(newPath("d:/sink"), writerFactory).withBucketAssigner(newDateTimeBucketAssigner<String>())// 文件分桶策略.withBucketCheckInterval(5)// 文件夹异步线程创建和检测周期.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flinkdemo")// 文件前缀.withPartSuffix(".txt")// 文件后缀.build())// 文件的输出格式对象.build();

        sideOutput.sinkTo(build);

        env.execute();}}
标签: flink 学习 java

本文转载自: https://blog.csdn.net/weixin_44244088/article/details/131316648
版权归原作者 C0oOder 所有, 如有侵权,请联系我们删除。

“Flink 学习三 Flink 流 & process function API”的评论:

还没有评论