0


Flink Job 执行流程

Flink On Yarn 模式

在这里插入图片描述

基于

Yarn

层面的架构类似 **

Spark on Yarn

模式**,都是由

Client

提交

App

RM

上面去运行,然后 **

RM

分配第一个

container

去运行

AM

,然后由

AM

去负责资源的监督和管理**。需要说明的是,

Flink

Yarn

模式更加类似

Spark on Yarn

cluster

模式,在

cluster

模式中,

dirver

将作为

AM

中的一个线程去运行。**

Flink on Yarn

模式也是会将

JobManager

启动在

container

里面**,去做个

driver

类似的任务调度和分配,**

Yarn AM

Flink JobManager

在同一个

Container

中**,这样

AM

可以知道

Flink JobManager

的地址,从而

AM

可以申请

Container

去启动

Flink TaskManager

。待

Flink

成功运行在

Yarn

集群上,

Flink Yarn Client

就可以提交

Flink Job

Flink JobManager

,并进行后续的映射、调度和计算处理。

Fink on Yarn 的缺陷

【1】资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】**

On-Yarn

模式**下,所有的

container

都是固定大小的,导致无法根据作业需求来调整

container

的结构。譬如

CPU

密集的作业或需要更多的核,但不需要太多内存,固定结构的

container

会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,需要两个步骤来启动

Flink

作业:1.启动

Flink

守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
【4】

On-Yarn

模式下,作业管理页面会在作业完成后消失不可访问。
【5】

Flink

推荐 **

per job clusters

** 的部署方式,但是又支持可以在一个集群上运行多个作业的

session

模式,令人疑惑。

Flink

版本

1.5

中引入了

Dispatcher

Dispatcher

是在新设计里引入的一个新概念。

Dispatcher

会从

Client

端接受作业提交请求并代表它在集群管理器上启动作业。引入

Dispatcher

的原因主要有两点:
【1】一些集群管理器需要一个中心化的作业生成和监控实例;
【2】能够实现

Standalone

模式下

JobManager

的角色,且等待作业提交。在一些案例中,

Dispatcher

是可选的

Yarn

或者不兼容的

kubernetes

资源调度模型重构下的 Flink On Yarn 模式

[点击并拖拽以移动] ​

客户端提交

JobGraph

以及依赖

jar

包到

YarnResourceManager

,接着

Yarn ResourceManager

分配第一个

container

以此来启动

AppMaster

Application Master

中会启动一个

FlinkResourceManager

以及

JobManager

JobManager

会根据

JobGraph

生成的

ExecutionGraph

以及物理执行计划向

FlinkResourceManager

申请

slot

FlinkResoourceManager

会管理这些

slot

以及请求,如果没有可用

slot

就向

Yarn

ResourceManager

申请

container

container

启动以后会注册到

FlinkResourceManager

,最后

JobManager

会将

subTask deploy

到对应

container

slot

中去。
[点击并拖拽以移动] ​

在有

Dispatcher

的模式下:会增加一个过程,就是

Client

会直接通过

HTTP Server

的方式,然后用

Dispatcher

将这个任务提交到

Yarn ResourceManager

中。

新框架具有四大优势,详情如下:
【1】

client

直接在

Yarn

上启动作业,而不需要先启动一个集群然后再提交作业到集群。因此

client

再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的

classpath

,而不是用动态的用户代码

classloader

去加载。
【3】

container

在需要时才请求,不再使用时会被释放。
【4】“需要时申请”的

container

分配方式允许不同算子使用不同

profile

(

CPU

和内存结构)的

container

新的资源调度框架下 single cluster job on Yarn 流程介绍

[点击并拖拽以移动] ​

**

single cluster job on Yarn

模式**涉及三个实例对象:
**【1】

clifrontend

:**

Invoke App code

;生成

StreamGraph

,然后转化为

JobGraph


**【2】

YarnJobClusterEntrypoint(Master)

:** 依次启动

YarnResourceManager

MinDispatcher

JobManagerRunner

三者都服从分布式协同一致的策略;

JobManagerRunner

JobGraph

转化为

ExecutionGraph

,然后转化为物理执行任务

Execution

,然后进行

deploy

deploy

过程会向

YarnResourceManager

请求

slot

,如果有直接

deploy

到对应的

YarnTaskExecutiontor

slot

里面,没有则向

Yarn

ResourceManager

申请,带

container

启动以后

deploy


**【3】

YarnTaskExecutorRunner (slave)

:** 负责接收

subTask

,并运行。

整个任务运行代码调用流程如下图

[点击并拖拽以移动] ​

**

subTask

在执行时是怎么运行的?**

**调用

StreamTask

invoke

方法,执行步骤如下:**
【1】

initializeState()

operator

initializeState()


【2】

openAllOperators()

operator

open()

方法;
【3】最后调用

run

方法来进行真正的任务处理;

我们来看下

flatMap

对应的

OneInputStreamTask

run

方法具体是怎么处理的。

@Overrideprotectedvoidrun()throwsException{// 在堆栈上缓存处理器引用,使代码更易于JITfinalStreamInputProcessor<IN> inputProcessor =this.inputProcessor;while(running && inputProcessor.processInput()){// 所有的工作都发生在“processInput”方法中}}

最终是调用

StreamInputProcessor

processInput()

做数据的处理,这里面包含用户的处理逻辑。

publicbooleanprocessInput()throwsException{if(isFinished){returnfalse;}if(numRecordsIn ==null){try{
            numRecordsIn =((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();}catch(Exception e){LOG.warn("An exception occurred during the metrics setup.", e);
           numRecordsIn =newSimpleCounter();}}while(true){if(currentRecordDeserializer !=null){DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);if(result.isBufferConsumed()){
               currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
               currentRecordDeserializer =null;}if(result.isFullRecord()){StreamElement recordOrMark = deserializationDelegate.getInstance();//处理watermarkif(recordOrMark.isWatermark()){// handle watermark//watermark处理逻辑,这里可能引起timer的trigger
                   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);continue;}elseif(recordOrMark.isStreamStatus()){// handle stream status
                   statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);continue;//处理latency watermark}elseif(recordOrMark.isLatencyMarker()){// handle latency markersynchronized(lock){
                       streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());}continue;}else{//用户的真正的代码逻辑// now we can do the actual processingStreamRecord<IN> record = recordOrMark.asRecord();synchronized(lock){
                       numRecordsIn.inc();
                       streamOperator.setKeyContextElement1(record);//处理数据
                       streamOperator.processElement(record);}returntrue;}}}//这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,finalBufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();if(bufferOrEvent !=null){if(bufferOrEvent.isBuffer()){
               currentChannel = bufferOrEvent.getChannelIndex();
               currentRecordDeserializer = recordDeserializers[currentChannel];
               currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}else{// Event receivedfinalAbstractEvent event = bufferOrEvent.getEvent();if(event.getClass()!=EndOfPartitionEvent.class){thrownewIOException("Unexpected event: "+ event);}}}else{
           isFinished =true;if(!barrierHandler.isEmpty()){thrownewIllegalStateException("Trailing data in checkpoint barrier handler.");}returnfalse;}}}
streamOperator.processElement(record)

最终会调用用户的代码处理逻辑,假如

operator

StreamFlatMap

的话。

@OverridepublicvoidprocessElement(StreamRecord<IN> element)throwsException{
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);//用户代码}
标签: flink 大数据 java

本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135257550
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Flink Job 执行流程”的评论:

还没有评论