0


flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)

Flink学习笔记

前言:今天是学习flink的第四天啦!学习了物理分区的知识点,这一次学习了前4个简单的物理分区,称之为简单分区篇!
Tips:我相信自己会越来会好的,明天攻克困难分区篇,加油!

二、Flink 流批一体 API 开发

3. 物理分区

3.1 Global Partitioner

该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)

在这里插入图片描述

实例:编写Flink程序,接收socket的单词数据,以进程标记查看分区数据情况。

packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * @author lql
 * @time 2024-02-15 22:54:35
 * @description TODO
 */publicclassGlobalPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);// 对每个输入的数据进行映射处理,给每个单词添加上一个字符串以及当前所在的子任务编号SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(3);// 针对算子将并行度设置为 3;// 对数据流进行 global,将其随机均匀地划分到每个分区中DataStream<String> global = mapped.global();// 定义一个 sink 函数,输出每个单词和所在的子任务编号
        global.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +"->"+ index);}});

        env.execute();}}

结果:

hadoop : 1->0
hadoop : 2->0
hadoop : 0->0
spark : 1->0
spark : 2->0

总结:

  • 1- 多个进程处理的数据,汇总到 sink 第一个分区第一个进程
  • 2- 数据多出梳理,合并一处的现象
  • 3- getRuntimeContext()方法在 Rich Function 中,最后的 addSink()用心良苦!
  • 4- 并行任务之间共享相同状态的场景,如全局计数器等
3.2 Shuffer Partition

根据均匀分布随机划分元素。

在这里插入图片描述

实例:编写Flink程序,接收socket的单词数据,并将每个字符串均匀的随机划分到每个分区。

packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * @author lql
 * @time 2024-02-15 23:26:49
 * @description TODO:编写Flink程序,接收socket的单词数据,并将每个字符串均匀的随机划分到每个分区
 */publicclassShufflePartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(1);//shuffle!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!DataStream<String> shuffled = mapped.shuffle();

        shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}});
        env.execute();}}

结果:

结果现象:(没有规律)
           hadoop : 0 -> 0
           hadoop : 0 -> 4
           flink  : 0 -> 6
           flink  : 0 -> 7

总结:

  • 1- 它将数据均匀地分配到下游任务的每个并行实例中,然后再对每个并行任务的数据进行分区
  • 2- 这种分发方式适用于数据量比较大的场景,可以减少网络传输压力和降低数据倾斜的概率。
3.3 Broadcast Partition

发送到下游所有的算子实例

在这里插入图片描述

实例:编写Flink程序,接收socket的单词数据,并将每个字符串广播到每个分区。

packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;/**
 * @author lql
 * @time 2024-02-15 23:35:59
 * @description TODO
 */publicclassBroadcastPartitioningDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());//Source是一个非并行的Source//并行度是1DataStreamSource<String> lines = env.socketTextStream("node1",9999);//并行度2SingleOutputStreamOperator<String> mapped = lines.map(newRichMapFunction<String,String>(){@OverridepublicStringmap(String value)throwsException{int indexOfThisSubtask =getRuntimeContext().getIndexOfThisSubtask();return value +" : "+ indexOfThisSubtask;}}).setParallelism(1);//广播,上游的算子将一个数据广播到下游所以的subtaskDataStream<String> shuffled = mapped.broadcast();

        shuffled.addSink(newRichSinkFunction<String>(){@Overridepublicvoidinvoke(String value,Context context)throwsException{int index =getRuntimeContext().getIndexOfThisSubtask();System.out.println(value +" -> "+ index);}});

        env.execute();}}

结果:

hadoop : 0 -> 0
hadoop : 0 -> 2
hadoop : 0 -> 1
hadoop : 0 -> 3
hadoop : 0 -> 4
hadoop : 0 -> 6
hadoop : 0 -> 5
hadoop : 0 -> 7
spark : 0 -> 3
spark : 0 -> 2
spark : 0 -> 6
spark : 0 -> 4
spark : 0 -> 0
spark : 0 -> 1
spark : 0 -> 5
spark : 0 -> 7

总结:

  • 均匀广播数据
3.4 Rebalance Partition

通过循环的方式依次发送到下游的task

在这里插入图片描述

实例:轮询发送数据

packagecn.itcast.day04.partition;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @author lql
 * @time 2024-02-15 23:41:46
 * @description TODO: flink的数据倾斜解决方案:轮询发送(当设置并行度为1时)
 */publicclassRebalanceDemo{publicstaticvoidmain(String[] args)throwsException{/**
         * 构建批处理运行环境
         * 使用 env.generateSequence 创建0-100的并行数据
         * 使用 fiter 过滤出来 大于8 的数字
         * 使用map操作传入 RichMapFunction ,将当前子任务的ID和数字构建成一个元组
         * 在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 获取子任务序号
         * 打印测试
         *///TODO 构建批处理运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//TODO 使用 env.generateSequence 创建0-100的并行数据DataStream<Long> dataSource = env.generateSequence(0,100);//TODO 使用 fiter 过滤出来 大于8 的数字SingleOutputStreamOperator<Long> filteredDataSource = dataSource.filter(newFilterFunction<Long>(){@Overridepublicbooleanfilter(Long aLong)throwsException{return aLong >8;}});//解决数据倾斜的问题DataStream<Long> rebalance = filteredDataSource.rebalance();//TODO 使用map操作传入 RichMapFunction ,将当前子任务的ID和数字构建成一个元组//查看92条数据分别被哪些线程处理的,可以看到每个线程处理的数据条数//spark中查看数据属于哪个分区使用哪个函数?mapPartitionWithIndex//TODO 在RichMapFunction中可以使用 getRuntimeContext.getIndexOfThisSubtask 获取子任务序号SingleOutputStreamOperator<Tuple2<Long,Integer>> tuple2MapOperator = rebalance.map(newRichMapFunction<Long,Tuple2<Long,Integer>>(){@OverridepublicTuple2<Long,Integer>map(Long aLong)throwsException{returnTuple2.of(aLong,getRuntimeContext().getIndexOfThisSubtask());}});//TODO 打印测试
        tuple2MapOperator.print();

        env.execute();}}

结果:

 * 0-0
 * 0-1
 * 0-2
 * 0-0
 * 0-1
 * 0-2

总结:

  • 1- 轮询发送数据
  • 2- 解决数据倾斜问题
标签: flink 笔记 大数据

本文转载自: https://blog.csdn.net/m0_60732994/article/details/136124820
版权归原作者 那就学有所成吧(˵¯͒¯͒˵) 所有, 如有侵权,请联系我们删除。

“flink重温笔记(四):Flink 流批一体 API 开发——物理分区(上)”的评论:

还没有评论