0


flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)

Flink学习笔记

前言:今天是学习 flink 的第五天啦! 主要学习了物理分区较难理解的部分,在这个部分的三个分区的学习中, rescale partition 和 forward partition 其原理可以归类 pointwise 模式,其他的 partition 其原理可以归类 all_to_all 模式,而比较有趣的是 custom partitioning,这个可以进行根据值的输入进行自定义分区。

Tips:尼采曾经说过:“每一个不起眼的日子,都是对生命的辜负!” 虽然转码学习之路比起科班同学会更加艰辛,不过我相信只要愿意坚持,多理解多敲代码,多向各位大佬请教,即使一点一滴也是会有收获的,明天也要继续加油!

文章目录

二、Flink 流批一体 API 开发

3. 物理分区

3.5 Rescale Partition

RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例1:如图顺序

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。

举例2:如图倒序

若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

在这里插入图片描述

实例:上游并行度是2,下游是4,接收数据

packagecn.itcast.day05.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * @author lql
 * @time 2024-02-15 23:47:11
 * @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。
 */// 组内轮询publicclassRescalePartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(2);//广播,上游的算子将一个数据广播到下游所以的subtaskDataStream<String> shuffled = mapped.rescale();

        shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}}).setParallelism(4);

        env.execute();}}

结果:

hadoop : 0 -> 0
hadoop : 1 -> 2
hadoop : 0 -> 1
hadoop : 1 -> 3

hadoop : 0 -> 0
hadoop : 1 -> 2
hadoop : 0 -> 1
hadoop : 1 -> 3

总结:

  • rescale partition 类似于部分数据对应部分 sink,且数据不相冲突。
3.6 Forward Partition

发送到下游对应的第一个task,保证上下游算子并行度一致,即上游算子与下游算子是1:1的关系

在这里插入图片描述

实例:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。

packagecn.itcast.day05.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * @author lql
 * @time 2024-02-16 22:38:17
 * @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区
 */publicclassForwardPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(2);//广播,上游的算子将一个数据广播到下游所以的subtaskDataStream<String> shuffled = mapped.forward();

        shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}}).setParallelism(2);

        env.execute();}}

结果:

flink : 0 -> 0
flink : 1 -> 1
flink : 0 -> 0
flink : 1 -> 1

总结:

  • 1- Forward Partition 原理和 Rescale Partition 原理相似,遵循 POINTWISE 模式连接下游算子,而其他 Partition 都采用ALL_TO_ALL模式来连接
  • 2- 在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用RebalancePartitioner
  • 3- 对于 ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常

拓展:POINTWISE 模式和 ALL_TO_ALL模式

在这里插入图片描述
在这里插入图片描述

3.7 Custom Partitioning

使用用户定义的 Partitioner 为每个元素选择目标任务

实例:编写Flink程序,接收socket的单词数据,并将每个字符串写入到指定的分区中。

packagecn.itcast.day05.partition;importorg.apache.flink.api.common.functions.Partitioner;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * @author lql
 * @time 2024-02-16 22:51:31
 * @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串写入到指定的分区中。
 */publicclassCustomPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);SingleOutputStreamOperator<Tuple2<String,Integer>> mapped = lines.map(newRichMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();returnTuple2.of(value, indexOfThisSubtask);}});//按照指定的规则进行分区DataStream<Tuple2<String,Integer>> partitioned = mapped.partitionCustom(newPartitioner<String>(){@Overridepublicintpartition(String key,int numPartitions){int res =0;if("spark".equals(key)){
                    res =1;}elseif("flink".equals(key)){
                    res =2;}elseif("hadoop".equals(key)){
                    res =3;}return res;}}, tp -> tp.f0);

        partitioned.addSink(newRichSinkFunction<Tuple2<String,Integer>>(){@Overridepublicvoidinvoke(Tuple2<String,Integer> value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value.f0 +" , 上游 "+ value.f1 +" -> 下游 "+ index);}});
        env.execute();}}

结果

hadoop , 上游 1 -> 下游 3
spark , 上游 2 -> 下游 1
flink , 上游 3 -> 下游 2

总结

  • partitionCustom 可以指定规则进行分区
  • tp -> tp.f0 这里指的是 Tuple2 中接受的数据以第一个作为判断标准
标签: flink 笔记 大数据

本文转载自: https://blog.csdn.net/m0_60732994/article/details/136133761
版权归原作者 那就学有所成吧(˵¯͒¯͒˵) 所有, 如有侵权,请联系我们删除。

“flink重温笔记(五):Flink 流批一体 API 开发——物理分区(下)”的评论:

还没有评论