Flink学习笔记
前言:今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值的输入进行自定义分区。
Tips:尼采曾经说过:“每一个不起眼的日子,都是对生命的辜负!” 虽然转码学习之路比起科班同学会更加艰辛,不过我相信只要愿意坚持,多理解多敲代码,多向各位大佬请教,即使一点一滴也是会有收获的,明天也要继续加油!
文章目录
二、Flink 流批一体 API 开发
3. 物理分区
3.5 Rescale Partition
RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。
举例1:如图顺序
上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
举例2:如图倒序
若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。
实例:上游并行度是2,下游是4,接收数据
packagecn.itcast.day05.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
* @author lql
* @time 2024-02-15 23:47:11
* @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。
*/// 组内轮询publicclassRescalePartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(2);//广播,上游的算子将一个数据广播到下游所以的subtaskDataStream<String> shuffled = mapped.rescale();
shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}}).setParallelism(4);
env.execute();}}
结果:
hadoop : 0 -> 0
hadoop : 1 -> 2
hadoop : 0 -> 1
hadoop : 1 -> 3
hadoop : 0 -> 0
hadoop : 1 -> 2
hadoop : 0 -> 1
hadoop : 1 -> 3
总结:
- rescale partition 类似于部分数据对应部分 sink,且数据不相冲突。
3.6 Forward Partition
发送到下游对应的第一个task,保证上下游算子并行度一致,即上游算子与下游算子是1:1的关系
实例:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。
packagecn.itcast.day05.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
* @author lql
* @time 2024-02-16 22:38:17
* @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区
*/publicclassForwardPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(2);//广播,上游的算子将一个数据广播到下游所以的subtaskDataStream<String> shuffled = mapped.forward();
shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}}).setParallelism(2);
env.execute();}}
结果:
flink : 0 -> 0
flink : 1 -> 1
flink : 0 -> 0
flink : 1 -> 1
总结:
- 1- Forward Partition 原理和 Rescale Partition 原理相似,遵循 POINTWISE 模式连接下游算子,而其他 Partition 都采用ALL_TO_ALL模式来连接
- 2- 在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用RebalancePartitioner
- 3- 对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常
拓展:POINTWISE 模式和 ALL_TO_ALL模式
3.7 Custom Partitioning
使用用户定义的 Partitioner 为每个元素选择目标任务
实例:编写Flink程序,接收socket的单词数据,并将每个字符串写入到指定的分区中。
packagecn.itcast.day05.partition;importorg.apache.flink.api.common.functions.Partitioner;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
* @author lql
* @time 2024-02-16 22:51:31
* @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串写入到指定的分区中。
*/publicclassCustomPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapped = lines.map(newRichMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();returnTuple2.of(value, indexOfThisSubtask);}});//按照指定的规则进行分区DataStream<Tuple2<String,Integer>> partitioned = mapped.partitionCustom(newPartitioner<String>(){@Overridepublicintpartition(String key,int numPartitions){int res =0;if("spark".equals(key)){
res =1;}elseif("flink".equals(key)){
res =2;}elseif("hadoop".equals(key)){
res =3;}return res;}}, tp -> tp.f0);
partitioned.addSink(newRichSinkFunction<Tuple2<String,Integer>>(){@Overridepublicvoidinvoke(Tuple2<String,Integer> value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value.f0 +" , 上游 "+ value.f1 +" -> 下游 "+ index);}});
env.execute();}}
结果:
hadoop , 上游 1 -> 下游 3
spark , 上游 2 -> 下游 1
flink , 上游 3 -> 下游 2
总结:
- partitionCustom 可以指定规则进行分区
- tp -> tp.f0 这里指的是 Tuple2 中接受的数据以第一个作为判断标准
版权归原作者 那就学有所成吧(˵¯͒¯͒˵) 所有, 如有侵权,请联系我们删除。