0


【Flink】详解Flink的八种分区

简介

Flink是一个流处理框架,一个Flink-Job由多个Task/算子构成,逻辑层面构成一个链条,同时Flink支持并行操作,每一个并行度可以理解为一个数据管道称之为SubTask。我们画图来看一下:

Flink-job

数据会在多个算子的SubTask之间相互传递,算子之间的并行度可能是不同的,这样就产生了数据分区问题,其核心问题在于上游的某个SubTask的数据该发送到下游的哪一个SubTask中。为了解决分区相关问题,Flink提供了一系列分区算子,下面将详细为大家介绍分区算子和相关的分区器。

分区算子

Flink一共有6种(rescale和rebalance都是轮询算子)或者7种分区算子:

  • shuffle :调用shuffle方法将会随机分配,总体上服从均匀分布;
  • rebalance:调用rebalance方法将会轮询分配,对所有的并⾏⼦任务进⾏轮询分配,可能会导致TM之间的数据交换;
  • rescale:调用rescale方法将会以组为单位轮训分配,而不是整体进行轮训,为了避免TM之间的数据交互;
  • broadcast:调用broadcast方法将数据流广播给所有的下游子任务;
  • global:调用global方法将会进行全局分区,将上游所有数据发送到下游第一个分区中;
  • keyby:调用keyby方法将会按键分区。
  • 自定义规则:自定义数据分发策略。代表算子为partitionCustom。

分区器

概述

每一个分区算子的底层实际上对应一个分区器,一共8个分区器

  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • RescalePartitioner
  • BroadcastPartitioner
  • ForwardPartitioner
  • KeyGroupStreamPartitioner
  • CustomPartitionerWrapper

各个分区器的继承关系如下:

在这里插入图片描述

接下来将详细介绍每一个分区算子和对应的分区器。

ChannelSelector

ChannelSelector是分区器共同实现的接口,定义分区器的基本行为。

publicinterfaceChannelSelector<TextendsIOReadableWritable>{// 初始化ChannelSelector,传入的参数为下游channel的数量voidsetup(int numberOfChannels);// 返回选择的channel索引编号,这个方法决定的上游的数据需要写入到哪个channel中// 这个方法的Partitioner子类重点需要实现的方法// 对于broadcast广播类型算子,不需要实现这个方法// 尽管broadcast不需要实现这个方法,但是还是重写了方法,throw new UnsupportedOperationException// 传入的参数为记录数据流中的元素,该方法需要根据元素来推断出需要发送到的下游channelintselectChannel(T record);// 返回是否为广播类型booleanisBroadcast();}

StreamPartitioner

StreamPartitioner

抽象类实现了

StreamPartitioner

接口,它的代码如下所示:

publicabstractclassStreamPartitioner<T>implementsChannelSelector<SerializationDelegate<StreamRecord<T>>>,Serializable{privatestaticfinallong serialVersionUID =1L;// 下游的channel数量protectedint numberOfChannels;// 初始化的时候就知道下游的channel数量@Overridepublicvoidsetup(int numberOfChannels){this.numberOfChannels = numberOfChannels;}// 肯定不是广播类型@OverridepublicbooleanisBroadcast(){returnfalse;}publicabstractStreamPartitioner<T>copy();@Overridepublicbooleanequals(Object o){if(this== o){returntrue;}if(o ==null||getClass()!= o.getClass()){returnfalse;}finalStreamPartitioner<?> that =(StreamPartitioner<?>) o;return numberOfChannels == that.numberOfChannels;}@OverridepublicinthashCode(){returnObjects.hash(numberOfChannels);}// 决定了作业恢复时候上游遇到扩缩容的话,需要处理哪些上游状态保存的数据publicSubtaskStateMappergetUpstreamSubtaskStateMapper(){returnSubtaskStateMapper.ARBITRARY;}// 决定了作业恢复时候下游遇到扩缩容的话,需要处理哪些下游状态保存的数据publicabstractSubtaskStateMappergetDownstreamSubtaskStateMapper();// 该方法定义了上下游之间的关系类型,如果返回True,表示上下游SubTask之间有明确的一一对应关系,如果返回False代表上下游SubTask之间没有明确的对应关系publicabstractbooleanisPointwise();}

ShufflePartitioner

@PublicEvolvingpublicDataStream<T>shuffle(){returnsetConnectionType(newShufflePartitioner<T>());}

可以看到

shuffle

算子对应的分区器是【ShufflePartitioner】。

publicclassShufflePartitioner<T>extendsStreamPartitioner<T>{privatestaticfinallong serialVersionUID =1L;privateRandom random =newRandom();// 重要// 随机返回一个下游Channel,由于random.nextInt符合均匀分布,所以shuffle的数据分布也符合均匀分布@OverridepublicintselectChannel(SerializationDelegate<StreamRecord<T>> record){return random.nextInt(numberOfChannels);}@OverridepublicSubtaskStateMappergetDownstreamSubtaskStateMapper(){returnSubtaskStateMapper.ROUND_ROBIN;}@OverridepublicStreamPartitioner<T>copy(){returnnewShufflePartitioner<T>();}// ShufflePartitioner上下游Subtask之间没有明确对应关系@OverridepublicbooleanisPointwise(){returnfalse;}@OverridepublicStringtoString(){return"SHUFFLE";}}

图例

shuffle

GlobalPartitioner

publicDataStream<T>global(){returnsetConnectionType(newGlobalPartitioner<T>());}

可以看到

global

对应的分区器是【GlobalPartitioner】。

publicclassGlobalPartitioner<T>extendsStreamPartitioner<T>{privatestaticfinallong serialVersionUID =1L;// 数据永远发往下游第一个SubTask。@OverridepublicintselectChannel(SerializationDelegate<StreamRecord<T>> record){return0;}@OverridepublicStreamPartitioner<T>copy(){returnthis;}// 恢复任务的时候将会恢复到第一个任务。@OverridepublicSubtaskStateMappergetDownstreamSubtaskStateMapper(){returnSubtaskStateMapper.FIRST;}// ShufflePartitioner上下游Subtask之间没有明确对应关系@OverridepublicbooleanisPointwise(){returnfalse;}@OverridepublicStringtoString(){return"GLOBAL";}}

图例

在这里插入图片描述

ForwardPartitioner

publicclassForwardPartitioner<T>extendsStreamPartitioner<T>{privatestaticfinallong serialVersionUID =1L;// 还是发往下游第一个SubTask,不同的是这里的下游SubTask是在本地的。@OverridepublicintselectChannel(SerializationDelegate<StreamRecord<T>> record){return0;}publicStreamPartitioner<T>copy(){returnthis;}// 上下游SubTask是一一对应的,如果上下游算子并行度不一致就会报错@OverridepublicbooleanisPointwise(){returntrue;}@OverridepublicStringtoString(){return"FORWARD";}@OverridepublicSubtaskStateMappergetDownstreamSubtaskStateMapper(){returnSubtaskStateMapper.UNSUPPORTED;}@OverridepublicSubtaskStateMappergetUpstreamSubtaskStateMapper(){returnSubtaskStateMapper.UNSUPPORTED;}}
ForwardPartitioner

StreamGraph

addEdgeInternal

方法中自动创建(生成StreamGraph的过程),代码片段如下所示:

// ...if(partitioner ==null&& upstreamNode.getParallelism()== downstreamNode.getParallelism()){// 只有在上游和下游的并行度相同且没有指定相关分区器的时候,才会使用ForwardPartitioner
    partitioner =newForwardPartitioner<Object>();}elseif(partitioner ==null){// 否 则使用RebalancePartitioner
    partitioner =newRebalancePartitioner<Object>();}// 这里还会再次检测上游和下游的并行度是否一致// 防止用户强行指定使用ForwardPartitioner时候上下游的并行度不一致if(partitioner instanceofForwardPartitioner){if(upstreamNode.getParallelism()!= downstreamNode.getParallelism()){thrownewUnsupportedOperationException("Forward partitioning does not allow "+"change of parallelism. Upstream operation: "+ upstreamNode
                        +" parallelism: "+ upstreamNode.getParallelism()+", downstream operation: "+ downstreamNode
                        +" parallelism: "+ downstreamNode.getParallelism()+" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");}}// ...

或者调用

forward

算子创建,这个方法基本不使用。

publicDataStream<T>forward(){returnsetConnectionType(newForwardPartitioner<T>());}

图例

在这里插入图片描述

RebalancePartitioner

publicDataStream<T>rebalance(){returnsetConnectionType(newRebalancePartitioner<T>());}

可以看到

rebalance

对应的分区器是【RebalancePartitioner】。

publicclassRebalancePartitioner<T>extendsStreamPartitioner<T>{privatestaticfinallong serialVersionUID =1L;// 记录要接受数据的下游Channel编号privateint nextChannelToSendTo;@Overridepublicvoidsetup(int numberOfChannels){super.setup(numberOfChannels);

        nextChannelToSendTo =ThreadLocalRandom.current().nextInt(numberOfChannels);}// 采用取余的方式找出发送的下游channel@OverridepublicintselectChannel(SerializationDelegate<StreamRecord<T>> record){
        nextChannelToSendTo =(nextChannelToSendTo +1)% numberOfChannels;return nextChannelToSendTo;}// 恢复的时候将保存数据轮询发送@OverridepublicSubtaskStateMappergetDownstreamSubtaskStateMapper(){returnSubtaskStateMapper.ROUND_ROBIN;}publicStreamPartitioner<T>copy(){returnthis;}// 上下游SubTask之间没有意义对应关系@OverridepublicbooleanisPointwise(){returnfalse;}@OverridepublicStringtoString(){return"REBALANCE";}}

图例

在这里插入图片描述

RescalePartitioner

publicDataStream<T>rescale(){returnsetConnectionType(newRescalePartitioner<T>());}

可以看到

rescale

对应的分区器是【RescalePartitioner】。跟

rebalance

不同,例如上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。如果上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。(可以理解是一种负载均衡的轮询

publicclassRescalePartitioner<T>extendsStreamPartitioner<T>{privatestaticfinallong serialVersionUID =1L;privateint nextChannelToSendTo =-1;// 采用的方式和rebalance一致,都是轮询的策略@OverridepublicintselectChannel(SerializationDelegate<StreamRecord<T>> record){if(++nextChannelToSendTo >= numberOfChannels){
            nextChannelToSendTo =0;}return nextChannelToSendTo;}// 恢复的时候不支持扩缩容,因为原本的对应关系已经被破坏了@OverridepublicSubtaskStateMappergetDownstreamSubtaskStateMapper(){returnSubtaskStateMapper.UNSUPPORTED;}// 恢复的时候不支持扩缩容,因为原本的对应关系已经被破坏了@OverridepublicSubtaskStateMappergetUpstreamSubtaskStateMapper(){returnSubtaskStateMapper.UNSUPPORTED;}publicStreamPartitioner<T>copy(){returnthis;}@OverridepublicStringtoString(){return"RESCALE";}// 这是有一一对应关系的分区方式@OverridepublicbooleanisPointwise(){returntrue;}}

图例

在这里插入图片描述

KeyGroupPartitioner

public<K>KeyedStream<T,K>keyBy(KeySelector<T,K> key){Preconditions.checkNotNull(key);returnnewKeyedStream<>(this,clean(key));}// 调用keyby返回一个KeyedStream// 在KeyedStream底层用一个PartitionTransformation包装了KeyGroupStreamPartitioner(键提取器,和默认最大键组数)// publicKeyedStream(DataStream<T> dataStream,KeySelector<T, KEY> keySelector,TypeInformation<KEY> keyType){this(
                dataStream,newPartitionTransformation<>(
                        dataStream.getTransformation(),newKeyGroupStreamPartitioner<>(
                                keySelector,StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
                keySelector,
                keyType);}

以下是【KeyGroupStreamPartitioner】的源码分析

publicclassKeyGroupStreamPartitioner<T,K>extendsStreamPartitioner<T>implementsConfigurableStreamPartitioner{privatestaticfinallong serialVersionUID =1L;privatefinalKeySelector<T,K> keySelector;privateint maxParallelism;@OverridepublicintselectChannel(SerializationDelegate<StreamRecord<T>> record){K key;try{// 通过keySelector获取键
            key = keySelector.getKey(record.getInstance().getValue());}catch(Exception e){thrownewRuntimeException("Could not extract key from "+ record.getInstance().getValue(), e);}// returnKeyGroupRangeAssignment.assignKeyToParallelOperator(
                key, maxParallelism, numberOfChannels);}@OverridepublicSubtaskStateMappergetDownstreamSubtaskStateMapper(){returnSubtaskStateMapper.RANGE;}// 上下游SubTask没有一一对应关系@OverridepublicbooleanisPointwise(){returnfalse;}// 这里是检查是否配置了最大并行度(最大建组数),如果有配置则替代默认值@Overridepublicvoidconfigure(int maxParallelism){KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);this.maxParallelism = maxParallelism;}}// 包装了一层检查一下键是否是null// key:键;// maxParallelis:支持的最大并行度,也就是键组的数量// parallelism:当前并行度publicstaticintassignKeyToParallelOperator(Object key,int maxParallelism,int parallelism){Preconditions.checkNotNull(key,"Assigned key must not be null!");returncomputeOperatorIndexForKeyGroup(maxParallelism, parallelism,assignToKeyGroup(key, maxParallelism));}// 分配键组// key:键;// maxParallelis:支持的最大并行度,也就是键组的数量publicstaticintassignToKeyGroup(Object key,int maxParallelism){Preconditions.checkNotNull(key,"Assigned key must not be null!");returncomputeKeyGroupForKeyHash(key.hashCode(), maxParallelism);}// 通过键组ID*当前并行度/最大键组数量默认128来分配数据流向的channel// maxParallelis:支持的最大并行度,也就是键组的数量// parallelism:当前并行度// keyGroupId:键组IDpublicstaticintcomputeOperatorIndexForKeyGroup(int maxParallelism,int parallelism,int keyGroupId){return keyGroupId * parallelism / maxParallelism;}

图例

在这里插入图片描述

Flink如何使用分区器

Flink通过

RecordWriter

向下游写入输入。

RecordWriter

通过

RecordWriterBuilder

创建。

publicRecordWriter<T>build(ResultPartitionWriter writer){if(selector.isBroadcast()){returnnewBroadcastRecordWriter<>(writer, timeout, taskName);}else{returnnewChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);}}

build

方法中会调用【selector】的

isBroadcast

方法,如果是广播类型,则创建【BroadcastRecordWriter】对象来写数据,否则创建【ChannelSelectorRecordWriter】对象来写数据。

以下是【BroadcastRecordWriter】对象的源码分析:

publicfinalclassBroadcastRecordWriter<TextendsIOReadableWritable>extendsRecordWriter<T>{

    broadcastEmit方法
    // writer都是调用emit方法,在BroadcastRecordWriter中进行了包装,实质调用的是broadcastEmit方法@Overridepublicvoidemit(T record)throwsIOException{broadcastEmit(record);}@OverridepublicvoidbroadcastEmit(T record)throwsIOException{// 检查checkErroneous();// 先使用序列化器将数据序列化,然后进行广播
        targetPartition.broadcastRecord(serializeRecord(serializer, record));if(flushAlways){flushAll();}}}

以下是【ChannelSelectorRecordWriter】对象源码分析:

publicfinalclassChannelSelectorRecordWriter<TextendsIOReadableWritable>extendsRecordWriter<T>{privatefinalChannelSelector<T> channelSelector;@Overridepublicvoidemit(T record)throwsIOException{// 分区器根据当前记录计算出下游Subtask的索引,然后发送emit(record, channelSelector.selectChannel(record));}protectedvoidemit(T record,int targetSubpartition)throwsIOException{checkErroneous();// 先进行序列化操作// targetSubpartition就是上一步中分区器计算的SubTask索引
        targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);if(flushAlways){
            targetPartition.flush(targetSubpartition);}}}

总结

  1. Flink本身提供了多种分区API,在底层使用的都是分区器,Flink一般提供了7种分区器;
  2. 按键分区本质上是按键组分区,通过分配键组的方式分配键;
  3. rescale本地轮流分配)和rebalance轮流分配)有区别,前者考虑了TM之间数据传输的问题,可以理解是一种软负载均衡的轮询;

往期回顾

  1. 【Flink】浅谈Flink背压问题(1)
  2. 【分布式】浅谈CAP、BASE理论(1)

文中难免会出现一些描述不当之处(尽管我已反复检查多次),欢迎在留言区指正,列表相关的知识点也可进行分享。


本文转载自: https://blog.csdn.net/pcx171/article/details/128767822
版权归原作者 小猪猪家的大猪猪 所有, 如有侵权,请联系我们删除。

“【Flink】详解Flink的八种分区”的评论:

还没有评论