Flink On Yarn 模式
基于
Yarn
层面的架构类似 **
Spark on Yarn
模式**,都是由
Client
提交
App
到
RM
上面去运行,然后 **
RM
分配第一个
container
去运行
AM
,然后由
AM
去负责资源的监督和管理**。需要说明的是,
Flink
的
Yarn
模式更加类似
Spark on Yarn
的
cluster
模式,在
cluster
模式中,
dirver
将作为
AM
中的一个线程去运行。**
Flink on Yarn
模式也是会将
JobManager
启动在
container
里面**,去做个
driver
类似的任务调度和分配,**
Yarn AM
与
Flink JobManager
在同一个
Container
中**,这样
AM
可以知道
Flink JobManager
的地址,从而
AM
可以申请
Container
去启动
Flink TaskManager
。待
Flink
成功运行在
Yarn
集群上,
Flink Yarn Client
就可以提交
Flink Job
到
Flink JobManager
,并进行后续的映射、调度和计算处理。
Fink on Yarn 的缺陷
【1】资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】**
On-Yarn
模式**下,所有的
container
都是固定大小的,导致无法根据作业需求来调整
container
的结构。譬如
CPU
密集的作业或需要更多的核,但不需要太多内存,固定结构的
container
会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,需要两个步骤来启动
Flink
作业:1.启动
Flink
守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
【4】
On-Yarn
模式下,作业管理页面会在作业完成后消失不可访问。
【5】
Flink
推荐 **
per job clusters
** 的部署方式,但是又支持可以在一个集群上运行多个作业的
session
模式,令人疑惑。
在
Flink
版本
1.5
中引入了
Dispatcher
,
Dispatcher
是在新设计里引入的一个新概念。
Dispatcher
会从
Client
端接受作业提交请求并代表它在集群管理器上启动作业。引入
Dispatcher
的原因主要有两点:
【1】一些集群管理器需要一个中心化的作业生成和监控实例;
【2】能够实现
Standalone
模式下
JobManager
的角色,且等待作业提交。在一些案例中,
Dispatcher
是可选的
Yarn
或者不兼容的
kubernetes
。
资源调度模型重构下的 Flink On Yarn 模式
客户端提交
JobGraph
以及依赖
jar
包到
YarnResourceManager
,接着
Yarn ResourceManager
分配第一个
container
以此来启动
AppMaster
,
Application Master
中会启动一个
FlinkResourceManager
以及
JobManager
,
JobManager
会根据
JobGraph
生成的
ExecutionGraph
以及物理执行计划向
FlinkResourceManager
申请
slot
,
FlinkResoourceManager
会管理这些
slot
以及请求,如果没有可用
slot
就向
Yarn
的
ResourceManager
申请
container
,
container
启动以后会注册到
FlinkResourceManager
,最后
JobManager
会将
subTask deploy
到对应
container
的
slot
中去。
在有
Dispatcher
的模式下:会增加一个过程,就是
Client
会直接通过
HTTP Server
的方式,然后用
Dispatcher
将这个任务提交到
Yarn ResourceManager
中。
新框架具有四大优势,详情如下:
【1】
client
直接在
Yarn
上启动作业,而不需要先启动一个集群然后再提交作业到集群。因此
client
再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的
classpath
,而不是用动态的用户代码
classloader
去加载。
【3】
container
在需要时才请求,不再使用时会被释放。
【4】“需要时申请”的
container
分配方式允许不同算子使用不同
profile
(
CPU
和内存结构)的
container
。
新的资源调度框架下 single cluster job on Yarn 流程介绍
**
single cluster job on Yarn
模式**涉及三个实例对象:
**【1】
clifrontend
:**
Invoke App code
;生成
StreamGraph
,然后转化为
JobGraph
;
**【2】
YarnJobClusterEntrypoint(Master)
:** 依次启动
YarnResourceManager
、
MinDispatcher
、
JobManagerRunner
三者都服从分布式协同一致的策略;
JobManagerRunner
将
JobGraph
转化为
ExecutionGraph
,然后转化为物理执行任务
Execution
,然后进行
deploy
,
deploy
过程会向
YarnResourceManager
请求
slot
,如果有直接
deploy
到对应的
YarnTaskExecutiontor
的
slot
里面,没有则向
Yarn
的
ResourceManager
申请,带
container
启动以后
deploy
。
**【3】
YarnTaskExecutorRunner (slave)
:** 负责接收
subTask
,并运行。
整个任务运行代码调用流程如下图
**
subTask
在执行时是怎么运行的?**
**调用
StreamTask
的
invoke
方法,执行步骤如下:**
【1】
initializeState()
即
operator
的
initializeState()
;
【2】
openAllOperators()
即
operator
的
open()
方法;
【3】最后调用
run
方法来进行真正的任务处理;
我们来看下
flatMap
对应的
OneInputStreamTask
的
run
方法具体是怎么处理的。
@Overrideprotectedvoidrun()throwsException{// 在堆栈上缓存处理器引用,使代码更易于JITfinalStreamInputProcessor<IN> inputProcessor =this.inputProcessor;while(running && inputProcessor.processInput()){// 所有的工作都发生在“processInput”方法中}}
最终是调用
StreamInputProcessor
的
processInput()
做数据的处理,这里面包含用户的处理逻辑。
publicbooleanprocessInput()throwsException{if(isFinished){returnfalse;}if(numRecordsIn ==null){try{
numRecordsIn =((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();}catch(Exception e){LOG.warn("An exception occurred during the metrics setup.", e);
numRecordsIn =newSimpleCounter();}}while(true){if(currentRecordDeserializer !=null){DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if(result.isBufferConsumed()){
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer =null;}if(result.isFullRecord()){StreamElement recordOrMark = deserializationDelegate.getInstance();//处理watermarkif(recordOrMark.isWatermark()){// handle watermark//watermark处理逻辑,这里可能引起timer的trigger
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);continue;}elseif(recordOrMark.isStreamStatus()){// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);continue;//处理latency watermark}elseif(recordOrMark.isLatencyMarker()){// handle latency markersynchronized(lock){
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());}continue;}else{//用户的真正的代码逻辑// now we can do the actual processingStreamRecord<IN> record = recordOrMark.asRecord();synchronized(lock){
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);//处理数据
streamOperator.processElement(record);}returntrue;}}}//这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,finalBufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if(bufferOrEvent !=null){if(bufferOrEvent.isBuffer()){
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else{// Event receivedfinalAbstractEvent event = bufferOrEvent.getEvent();if(event.getClass()!=EndOfPartitionEvent.class){thrownewIOException("Unexpected event: "+ event);}}}else{
isFinished =true;if(!barrierHandler.isEmpty()){thrownewIllegalStateException("Trailing data in checkpoint barrier handler.");}returnfalse;}}}
streamOperator.processElement(record)
最终会调用用户的代码处理逻辑,假如
operator
是
StreamFlatMap
的话。
@OverridepublicvoidprocessElement(StreamRecord<IN> element)throwsException{
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);//用户代码}
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。