0


Flink在Yarn模式部署和命令

     独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他 

第三方资源框架的耦合性,独立性非常强。但我们知道,Flink 是大数据计算框架,不是资源

调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架

集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所

以接下来我们就将学习,在强大的 YARN 平台上 Flink 是如何集成部署的。

    整体来说,YARN 上部署的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager, 

Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署

JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业

所需要的 Slot 数量动态分配 TaskManager 资源。

1、相关环境配置

     在 Flink1.8.0 之前的版本,想要以 YARN 模式部署 Flink 任务时,需要 Flink 是有 Hadoop 

支持的。从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要 Hadoop 的环境

支持,需要自行在官网下载 Hadoop 相关版本的组件 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,

并将该组件上传至 Flink 的 lib 目录下。在 Flink 1.11.0 版本之后,增加了很多重要新特性,其

中就包括增加了对Hadoop3.0.0以及更高版本Hadoop的支持,不再提供“flink-shaded-hadoop-*”

jar 包,而是通过配置环境变量完成与 YARN 集群的对接。

     在将 Flink 任务部署至 YARN 集群之前,需要确认集群是否安装有 Hadoop,保证 Hadoop 

版本至少在 2.2 以上,并且集群中安装有 HDFS 服务。

具体配置步骤如下:

(1)按照 3.1 节所述,下载并解压安装包,并将解压后的安装包重命名为 flink-1.13.0-yarn,

本节的相关操作都将默认在此安装路径下执行。

(2)配置环境变量,增加环境变量配置如下:

$ sudo vim /etc/profile.d/my_env.sh

HADOOP_HOME=/opt/module/hadoop-2.7.5

export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop

export HADOOP_CLASSPATH=hadoop classpath

这里必须保证设置了环境变量 HADOOP_CLASSPATH。

(3)启动 Hadoop 集群,包括 HDFS 和 YARN。

[atguigu@hadoop102 ~]$ start-dfs.sh

[atguigu@hadoop103 ~]$ start-yarn.sh

分别在 3 台节点服务器查看进程启动情况。

[atguigu@hadoop102 ~]$ jps

5190 Jps

5062 NodeManager

4408 NameNode

4589 DataNode

[atguigu@hadoop103 ~]$ jps

5425 Jps

4680 ResourceManager

5241 NodeManager

4447 DataNode

[atguigu@hadoop104 ~]$ jps

4731 NodeManager

4333 DataNode

4861 Jps

4478 SecondaryNameNode

(4)进入 conf 目录,修改 flink-conf.yaml 文件,修改以下配置,这些配置项的含义在进

行 Standalone 模式配置的时候进行过讲解,若在提交命令中不特定指明,这些配置将作为默认

配置。

$ cd /opt/module/flink-1.13.0-yarn/conf/

$ vim flink-conf.yaml

jobmanager.memory.process.size: 1600m

taskmanager.memory.process.size: 1728m

taskmanager.numberOfTaskSlots: 8

parallelism.default: 1

2、会话模式部署

YARN 的会话模式与独立集群略有不同,需要首先申请一个 YARN 会话(YARN session)

来启动 Flink 集群。具体步骤如下:

1. 启动集群

(1)启动 hadoop 集群(HDFS, YARN)。

(2)执行脚本命令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。

$ bin/yarn-session.sh -nm test

可用参数解读:

⚫ -d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,

4344

即使关掉当前对话窗口,YARN session 也可以后台运行。

⚫ -jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。

⚫ -nm(--name):配置在 YARN UI 界面上显示的任务名。

⚫ -qu(--queue):指定 YARN 队列名。

⚫ -tm(--taskManager):配置每个 TaskManager 所使用内存。

注意:Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,

YARN 会按照需求动态分配 TaskManager 和 slot。所以从这个意义上讲,YARN 的会话模式也

不会把集群资源固定,同样是动态分配的。

YARN Session 启动之后会给出一个 web UI 地址以及一个 YARN application ID,如下所示,

用户可以通过 web UI 或者命令行两种方式提交作业。

2. 提交作业

(1)通过 Web UI 提交作业

这种方式比较简单,与上文所述 Standalone 部署模式基本相同。

(2)通过命令行提交作业

① 将 Standalone 模式讲解中打包好的任务运行 JAR 包上传至集群

② 执行以下命令将该任务提交到已经开启的 Yarn-Session 中运行。

$ bin/flink run

-c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar

客户端可以自行确定 JobManager 的地址,也可以通过-m 或者-jobmanager 参数指定

JobManager 的地址,JobManager 的地址在 YARN Session 的启动页面中可以找到。

③ 任务提交成功后,可在 YARN 的 Web UI 界面查看运行情况。

如图 3-14 所示,从图中可以看到我们创建的 Yarn-Session 实际上是一个 Yarn 的

Application,并且有唯一的 Application ID。

④也可以通过 Flink 的 Web UI 页面查看提交任务的运行情况,如图 3-15 所示。

3、单作业模式部署

在 YARN 环境中,由于有了外部平台做资源调度,所以我们也可以直接向 YARN 提交一

个单独的作业,从而启动一个 Flink 集群。

(1)执行命令提交作业。

$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.StreamWordCount

FlinkTutorial-1.0-SNAPSHOT.jar

早期版本也有另一种写法:

$ bin/flink run

-m yarn-cluster

-c

com.atguigu.wc.StreamWordCount

FlinkTutorial-1.0-SNAPSHOT.jar

注意这里是通过参数-m yarn-cluster 指定向 YARN 集群提交任务。

(2)在 YARN 的 ResourceManager 界面查看执行情况,如图 3-16 所示

点击可以打开 Flink Web UI 页面进行监控,如图 3-17 所示:

(3)可以使用命令行查看或取消作业,命令如下。

$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

$ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY

<jobId>

这里的 application_XXXX_YY 是当前应用的 ID,<jobId>是作业的 ID。注意如果取消作

业,整个 Flink 集群也会停掉。

4、应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行 flink run-application 命令即可。

(1)执行命令提交作业。

$ bin/flink run-application -t yarn-application -c com.atguigu.wc.StreamWordCount

FlinkTutorial-1.0-SNAPSHOT.jar

(2)在命令行中查看或取消作业。

$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

$ ./bin/flink cancel

-t yarn-application

-Dyarn.application.id=application_XXXX_YY <jobId>

(3)也可以通过 yarn.provided.lib.dirs 配置选项指定位置,将 jar 上传到远程。

$ ./bin/flink run-application

-t yarn-application

-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"

hdfs://myhdfs/jars/my-application.jar

这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更

加轻量了。

5、高可用

   YARN 模式的高可用和独立模式(Standalone)的高可用原理不一样。 Standalone 模式中, 同时启动多个 JobManager, 一个为“领导者”(leader),其他为“后备” standby), 当 leader 挂了, 其他的才会有一个成为 leader。 而 YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次 启动一个, 所以其实是利用的 YARN 的重试次数来实现的高可用。 

(1)在 yarn-site.xml 中配置。

<property>

<name>yarn.resourcemanager.am.max-attempts</name>

<value>4</value>

<description>

The maximum number of application master execution attempts.

</description> </property>

注意: 配置完不要忘记分发, 和重启 YARN。

(2)在 flink-conf.yaml 中配置。

yarn.application-attempts: 3

high-availability: zookeeper

high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha

high-availability.zookeeper.quorum:

hadoop102:2181,hadoop103:2181,hadoop104:2181

high-availability.zookeeper.path.root: /flink-yarn

(3)启动 yarn-session。

(4)杀死 JobManager, 查看复活情况。

注意: yarn-site.xml 中配置的是 JobManager 重启次数的上限, flink-conf.xml 中的次数应该

小于这个值。

标签: flink java 大数据

本文转载自: https://blog.csdn.net/weixin_40659514/article/details/127509866
版权归原作者 weixin_lss 所有, 如有侵权,请联系我们删除。

“Flink在Yarn模式部署和命令”的评论:

还没有评论