0


Flink 本地单机/Standalone集群/YARN模式集群搭建

准备工作

本文简述

Flink

Linux

中安装步骤,和示例程序的运行。需要安装

JDK1.8

及以上版本。

下载地址:下载

Flink

的二进制包

在这里插入图片描述
点进去后,选择如下链接:
在这里插入图片描述
解压

flink-1.10.1-bin-scala_2.12.tgz

,我这里解压到

soft

目录

[root@hadoop1 softpackage]# tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C../soft/

单节点安装

解压后进入

Flink

bin

目录执行如下脚本即可

[root@hadoop1 bin]# ./start-cluster.sh 
 Startingcluster.
 Starting standalonesession daemon on host hadoop1.
 Starting taskexecutor daemon on host hadoop1.

进入

Flink

页面看看,如果没有修改配置中的端口,默认是

8081

在这里插入图片描述## 集群安装

集群安装分为以下几步:(注意:

hadoopx

都是我配置了

/etc/hosts

域名的)

bin

【1】将

hadoop1

中解压的

Flink

分发到其他机器上,同时我也配置了免密登录

SSH

(也可以手动复制

low

)。

[root@hadoop1 soft]# xsync flink-1.10.1

执行完后,我们就可以在

hadoop2

hadoop3

中看到

flink

在这里插入图片描述
【2】选择

hadoop1

作为

master

节点,然后修改所有机器

conf/flink-conf.yaml

(修改

hadoop1

分发即可)

jobmanager.rpc.address

密钥以指向您的主节点。您还应该通过设置

jobmanager.heap.size和taskmanager.memory.process.size

键来定义允许

Flink

在每个节点上分配的最大主内存量。这些值以

MB

为单位。如果某些工作节点有更多的主内存要分配给

Flink

系统,则可以通过在这些特定节点上设置

taskmanager.memory.process.size或taskmanager.memory.flink.size

conf / flink-conf.yaml

中覆盖默认值。

jobmanager.rpc.address = master主机名

【3】修改

master

conf/slaves

提供集群中所有节点的列表,这些列表将用作工作节点。我的是

hadoop2

hadoop3

。类似于

HDFS

配置,编辑文件

conf / slaves

并输入每个辅助节点的

IP

/主机名。每个工作节点稍后都将运行

TaskManager

hadoop2
hadoop3

以上示例说明了具有三个节点(主机名

hadoop1

作为

master

hadoop2

hadoop3

作为

worker

)的设置,并显示了配置文件的内容。

Flink

目录必须在同一路径下的每个工作线程上都可用。您可以使用共享的

NFS

(网络文件系统)目录,也可以将整个

Flink

目录复制到每个工作节点。特别是:
1、每个

JobManager

的可用内存量

jobmanager.heap.size


2、每个

TaskManager

的可用内存量(

taskmanager.memory.process.size

并查看内存设置指南);
3、每台计算机可用的

CPU

数(

taskmanager.numberOfTaskSlots

);
4、集群中的

CPU

总数(

parallelism.default

);
5、临时目录(

io.tmp.dirs

);
【4】在

master

上启动集群(第一行)以及执行结果。下面的脚本在本地节点上启动

JobManager

,并通过

SSH

连接到

slaves

文件中列出的所有辅助节点,以在每个节点上启动

TaskManager

。现在,您的 Flink系统已启动并正在运行。现在,在本地节点上运行的

JobManager

将在配置的

RPC

端口上接受作业。要停止

Flink

,还有一个

stop-cluster.sh

脚本。

[root@hadoop1 flink-1.10.1]# bin/start-cluster.sh 
 Startingcluster.
 Starting standalonesession daemon on host hadoop1.
 Starting taskexecutor daemon on host hadoop2.
 Starting taskexecutor daemon on host hadoop3.

【5】

Flink

界面展示 :进入

8081

端口,例如:

http://hadoop1:8081/ 

或者通过

jps

命令查看服务也可行。
在这里插入图片描述

Standalone

集群架构展示:

client

客户端提交任务给

JobManager

JobManager

负责

Flink

集群计算资源管理,并分发任务给

TaskManager

执行,

TaskManager

定期向

JobManager

汇报状态。
在这里插入图片描述

运行 flink示例程序

批处理示例:提交

Flink

的批处理

examples

程序:也可以在页面中进行提交,但是作为一名

NB

的程序员就使用命令

[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar

执行上面的命令后,就会显示如下信息,这是

Flink

提供的

examples

下的批处理例子程序,统计单词个数。

[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
ExecutingWordCount example withdefault input data set.
Use--input tospecify file input.
Printing result tostdout.Use--output tospecify output path.
Job has been submitted withJobID99f4c579947a66884ec269ddf5f5b0ed
Program execution finished
JobwithJobID99f4c579947a66884ec269ddf5f5b0ed has finished.
JobRuntime:795 ms
AccumulatorResults:- b70332353f355cf0464b0eba21f61075 (java.util.ArrayList)[170 elements](a,5)(action,1)(after,1)(against,1)(all,2)(and,12)(arms,1)(arrows,1)(awry,1)(ay,1)(bare,1)(be,4)(bear,3)(bodkin,1)(bourn,1)(but,1)(by,2)(calamity,1)(cast,1)(coil,1)(come,1)(conscience,1)(consummation,1)(contumely,1)(country,1)(cowards,1)(currents,1)......

得到结果,这里统计的是默认的数据集,可以通过

--input --output

指定输入输出。我们可以在页面中查看运行的情况:
在这里插入图片描述流处理示例:启动

nc

服务器:

[root@hadoop1 flink-1.10.1]# nc -lk 9000

提交

Flink

的批处理

examples

程序:

[root@hadoop1 flink-1.10.1]# bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname hadoop1  --port 9000

这是

Flink

提供的

examples

下的流处理例子程序,接收

socket

数据传入,统计单词个数。在

nc

端随意写入单词

[root@hadoop1 flink-1.10.1]# nc -lk 9000
 g
 s

进入

slave

节点(

hadoop2

hadoop3

),进入

Flink

安装目录输入如下命令,查看实时数据变化

[root@hadoop2 flink-1.10.1]# tail -f log/flink-*-taskexecutor-*.out
s :1:2
w :1
d :1
g :1
d :1

停止

Flink
[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh

Flink

web

中查看运行的

job

在这里插入图片描述

将 JobManager / TaskManager 实例添加到集群(扩展)

您可以使用

bin/jobmanager.sh

bin/taskmanager.sh

脚本将

JobManager

TaskManager

实例添加到正在运行的集群中。添加

JobManager

(确保在要启动/停止相应实例的主机上调用这些脚本)

[root@hadoop1 flink-1.10.1]# bin/jobmanager.sh ((start|start-foreground)[host][webui-port])|stop|stop-all

添加任务管理器

[root@hadoop1 flink-1.10.1]# bin/taskmanager.sh start|start-foreground|stop|stop-all

YARN模式

在企业中,经常需要将

Flink

集群部署到

YARN

,因为可以使用

YARN

来管理所有计算资源。而且

Spark

程序也可以部署到

YARN

上。

CliFrontend

是所有

job

的入口类,通过解析传递的参数(

jar

包,

mainClass

等),读取

flink

的环境,配置信息等,封装成

PackagedProgram

,最终通过

ClusterClient

提交给

Flink

集群。

Flink

运行在

YARN

上,提供了两种方式:
第一种使用

yarn-session

模式来快速提交作业到

YARN

集群。如下,在

Yarn

中初始化一个

flink

集群,开辟指定的资源,以后提交任务都向这里提交,这个

flink

集群会常驻在

Yarn

集群中,除非手动停止。共享

Dispatcher

ResourceManager

,共享资源。有大量的小作业,适合使用这种方式;
在这里插入图片描述

YarnSessionClusterEntrypoint

Flink

Yarn

上的线程。

ApplicationMaster

JobManager

YarnTaskExecutorRunner

负责接收

subTask

并运行,是

TaskManager


【1】修改

Hadoop

etc/hadoop/yarn-site.xml

,添加该配置表示内存超过分配值,是否将任务杀掉。默认为

true

。运行

Flink

程序,很容易超过分配的内存。

<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>

【2】 添加环境变量

//查看是否配置HADOOP_CONF_DIR,我这里没有配置输出为空[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR//在系统变量中添加 HADOOP_CONF_DIR[root@hadoop1 hadoop-2.7.2]# vim /etc/profile
//添加如下内容,wq保存退出
export HADOOP_CONF_DIR=$HADOOP_HOME/conf///刷新 /etc/profile[root@hadoop1 hadoop-2.7.2]# source /etc/profile

//重新查看是否配置HADOOP_CONF_DIR[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR/opt/module/hadoop-2.7.2/conf/

【3】启动

HDFS

YARN

集群。通过

jps

查看启动状况。关闭

flink

的其他集群。

[root@hadoop1 hadoop-2.7.2]# sbin/start-all.sh
[root@hadoop2 hadoop-2.7.2]# jps
10642NodeManager11093Jps10838ResourceManager10535DataNode10168TaskManagerRunner

【4】将官方指定Pre-bundled Hadoop 2.7.5包放到

flink

lib

目录下。使用

yarn-session

模式提交作业
在这里插入图片描述
使用

Flink

中的

yarn-session

yarn

客户端),会启动两个必要服务

JobManager

TaskManagers


客户端通过

yarn-session

提交作业;

yarn-session

会一直启动,不停地接收客户端提交的作用。

-n 表示申请2个容器
-s 表示每个容器启动多少个slot
-tm 表示每个TaskManager申请800M内存
-nm yarn 的 appName,
-d detached表示以后台程序方式运行

如下表示启动一个

yarn session

集群,每个

JM

1G

TM

的内存是

1G

[root@hadoop1 flink-1.10.1]# bin/yarn-session.sh -n 2 -jm 1024m -tm 1024m -d

客户端默认是

attach

模式,不会退出 。可以

ctrl+c

退出,然后再通过如下命令连上来。或者启动的时候用

-d

则为

detached

模式

./bin/yarn-session.sh -id application_1594027553009_0001(这个id来自下面hadoop集群)

在这里插入图片描述

Yarn

上显示为

Flink session cluster

,一致处于运行状态。
在这里插入图片描述点击

ApplicationMaster

就会进入

Flink

集群
在这里插入图片描述启动命令行中也会显示如下的

JobManager

启动的

Web

界面

JobManagerWebInterface: http://hadoop1:34431

在这里插入图片描述

然后我们可以通过

jps

来看下当前的进程,其中

YarnSessionClusterEntrypoint

就是我们

Yarn Session

的分布式集群。

[root@hadoop1 flink-1.10.1]# jps
69923NodeManager81267Jps69394NameNode69531DataNode80571FlinkYarnSessionCli80765YarnSessionClusterEntrypoint
/tmp

下生成了一个文件

Flink

应用部署到

Flink On Yarn 之 session

方式中。

[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/WordCount.jar 

在这里插入图片描述
查看运行结果:
在这里插入图片描述

Flink On Yarn

session

部署方式集群停止:关闭

Yarn

就会关闭

Flink

集群。。。

第二种模式:使用

Per-JOBYarn

分离模式(与当前客户端无关,当客户端提交完任务就结束,不用等到

Flink

应用执行完毕)提交作业:每次提交都会创建一个新的

flink

集群,任务之间相互独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。 直接提交任务给

YARN

,独享

Dispatcher

ResourceManager

。按需要申请资源。适合执行时间较长的大作业。
在这里插入图片描述

AM

启动类是

YarnJobClusterEntrypoint

YarnTaskExecutorRunner

负责接收

subTask

,就是

TaskManager

。需要打开

hadoop

yarn

分布式集群。不需要启动

flink

分布式集群,它会自动启动

flink

分布式集群。

[root@hadoop1 flink-1.10.1]# bin/flink run -m yarn-cluster -d ./examples/streaming/WordCount.jar
2020-07-1303:21:50,479WARNorg.apache.flink.yarn.cli.FlinkYarnSessionCli-The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want touse logback, then please delete or rename the log configuration file.2020-07-1303:21:50,479WARNorg.apache.flink.yarn.cli.FlinkYarnSessionCli-The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want touse logback, then please delete or rename the log configuration file.
ExecutingWordCount example withdefault input data set.
Use--input tospecify file input.
Printing result tostdout.Use--output tospecify output path.2020-07-1303:21:50,707INFOorg.apache.hadoop.yarn.client.RMProxy-ConnectingtoResourceManager at hadoop2/192.168.52.129:80322020-07-1303:21:50,791INFOorg.apache.flink.yarn.YarnClusterDescriptor-No path for the flink jar passed. Using the location of classorg.apache.flink.yarn.YarnClusterDescriptortolocate the jar
2020-07-1303:21:50,928WARNorg.apache.flink.yarn.YarnClusterDescriptor-Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. TheFlinkYARNClient needs one of these tobe set toproperly load the Hadoop configuration for accessing YARN.2020-07-1303:21:51,001INFOorg.apache.flink.yarn.YarnClusterDescriptor-Cluster specification:ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1728, slotsPerTaskManager=1}2020-07-1303:21:53,906INFOorg.apache.flink.yarn.YarnClusterDescriptor
-yn

yarncontainer

表示

TaskManager

的个数;

-yqu

yarnqueue

指定

yarn

的队列;

-ys

yarnslots

每一个

TaskManager

对应的

slot

个数;

上传成功之后,我们可以在

Hadoop

的图形化界面:http://hadoop2:8088/cluster/apps 中看到当前任务的信息;
在这里插入图片描述

标签: flink 大数据

本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/134902557
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Flink 本地单机/Standalone集群/YARN模式集群搭建”的评论:

还没有评论