Flink学习笔记
前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇!
Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油!
二、Flink 流批一体 API 开发
3. 物理分区
3.1 Global Partitioner
该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)
实例:编写Flink程序,接收socket的单词数据,以进程标记查看分区数据情况。
packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
* @author lql
* @time 2024-02-15 22:54:35
* @description TODO
*/publicclassGlobalPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);// 对每个输入的数据进行映射处理,给每个单词添加上一个字符串以及当前所在的子任务编号SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(3);// 针对算子将并行度设置为 3;// 对数据流进行 global,将其随机均匀地划分到每个分区中DataStream<String> global = mapped.global();// 定义一个 sink 函数,输出每个单词和所在的子任务编号
global.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +"->"+ index);}});
env.execute();}}
结果:
hadoop : 1->0
hadoop : 2->0
hadoop : 0->0
spark : 1->0
spark : 2->0
总结:
- 1- 多个进程处理的数据,汇总到 sink 第一个分区第一个进程
- 2- 数据多出梳理,合并一处的现象
- 3- getRuntimeContext()方法在 Rich Function 中,最后的 addSink()用心良苦!
- 4- 并行任务之间共享相同状态的场景,如全局计数器等
3.2 Shuffer Partition
根据均匀分布随机划分元素。
实例:编写Flink程序,接收socket的单词数据,并将每个字符串均匀的随机划分到每个分区。
packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
* @author lql
* @time 2024-02-15 23:26:49
* @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串均匀的随机划分到每个分区
*/publicclassShufflePartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(1);//shuffle!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!DataStream<String> shuffled = mapped.shuffle();
shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}});
env.execute();}}
结果:
结果现象:(没有规律)
hadoop : 0 -> 0
hadoop : 0 -> 4
flink : 0 -> 6
flink : 0 -> 7
总结:
- 1- 它将数据均匀地分配到下游任务的每个并行实例中,然后再对每个并行任务的数据进行分区
- 2- 这种分发方式适用于数据量比较大的场景,可以减少网络传输压力和降低数据倾斜的概率。
3.3 Broadcast Partition
发送到下游所有的算子实例
实例:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。
packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
* @author lql
* @time 2024-02-15 23:35:59
* @description TODO
*/publicclassBroadcastPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(1);//广播,上游的算子将一个数据广播到下游所以的subtaskDataStream<String> shuffled = mapped.broadcast();
shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}});
env.execute();}}
结果:
hadoop : 0 -> 0
hadoop : 0 -> 2
hadoop : 0 -> 1
hadoop : 0 -> 3
hadoop : 0 -> 4
hadoop : 0 -> 6
hadoop : 0 -> 5
hadoop : 0 -> 7
spark : 0 -> 3
spark : 0 -> 2
spark : 0 -> 6
spark : 0 -> 4
spark : 0 -> 0
spark : 0 -> 1
spark : 0 -> 5
spark : 0 -> 7
总结:
- 均匀广播数据
3.4 Rebalance Partition
通过循环的方式依次发送到下游的task
实例:轮询发送数据
packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @author lql
* @time 2024-02-15 23:41:46
* @description TODO: flink的数据倾斜解决方案:轮询发送(当设置并行度为1时)
*/publicclassRebalanceDemo{publicstaticvoidmain(String[] args)throwsException{/**
* 构建批处理运行环境
* 使用 env.generateSequence 创建0-100的并行数据
* 使用 fiter 过滤出来 大于8 的数字
* 使用map操作传入 RichMapFunction ,将当前子任务的ID和数字构建成一个元组
* 在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 获取子任务序号
* 打印测试
*///TODO 构建批处理运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//TODO 使用 env.generateSequence 创建0-100的并行数据DataStream<Long> dataSource = env.generateSequence(0,100);//TODO 使用 fiter 过滤出来 大于8 的数字SingleOutputStreamOperator<Long> filteredDataSource = dataSource.filter(newFilterFunction<Long>(){@Overridepublicbooleanfilter(Long aLong)throwsException{return aLong >8;}});//解决数据倾斜的问题DataStream<Long> rebalance = filteredDataSource.rebalance();//TODO 使用map操作传入 RichMapFunction ,将当前子任务的ID和数字构建成一个元组//查看92条数据分别被哪些线程处理的,可以看到每个线程处理的数据条数//spark中查看数据属于哪个分区使用哪个函数?mapPartitionWithIndex//TODO 在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 获取子任务序号SingleOutputStreamOperator<Tuple2<Long,Integer>> tuple2MapOperator = rebalance.map(newRichMapFunction<Long,Tuple2<Long,Integer>>(){@OverridepublicTuple2<Long,Integer>map(Long aLong)throwsException{returnTuple2.of(aLong,getRuntimeContext().getIndexOfThisSubtask());}});//TODO 打印测试
tuple2MapOperator.print();
env.execute();}}
结果:
* 0-0
* 0-1
* 0-2
* 0-0
* 0-1
* 0-2
总结:
- 1- 轮询发送数据
- 2- 解决数据倾斜问题
版权归原作者 那就学有所成吧(˵¯͒¯͒˵) 所有, 如有侵权,请联系我们删除。