准备工作
本文简述
Flink
在
Linux
中安装步骤,和示例程序的运行。需要安装
JDK1.8
及以上版本。
下载地址:下载
Flink
的二进制包
点进去后,选择如下链接:
解压
flink-1.10.1-bin-scala_2.12.tgz
,我这里解压到
soft
目录
[root@hadoop1 softpackage]# tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C../soft/
单节点安装
解压后进入
Flink
的
bin
目录执行如下脚本即可
[root@hadoop1 bin]# ./start-cluster.sh
Startingcluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop1.
进入
Flink
页面看看,如果没有修改配置中的端口,默认是
8081
## 集群安装
集群安装分为以下几步:(注意:
hadoopx
都是我配置了
/etc/hosts
域名的)
bin
【1】将
hadoop1
中解压的
Flink
分发到其他机器上,同时我也配置了免密登录
SSH
(也可以手动复制
low
)。
[root@hadoop1 soft]# xsync flink-1.10.1
执行完后,我们就可以在
hadoop2
和
hadoop3
中看到
flink
【2】选择
hadoop1
作为
master
节点,然后修改所有机器
conf/flink-conf.yaml
(修改
hadoop1
分发即可)
jobmanager.rpc.address
密钥以指向您的主节点。您还应该通过设置
jobmanager.heap.size和taskmanager.memory.process.size
键来定义允许
Flink
在每个节点上分配的最大主内存量。这些值以
MB
为单位。如果某些工作节点有更多的主内存要分配给
Flink
系统,则可以通过在这些特定节点上设置
taskmanager.memory.process.size或taskmanager.memory.flink.size
在
conf / flink-conf.yaml
中覆盖默认值。
jobmanager.rpc.address = master主机名
【3】修改
master
的
conf/slaves
提供集群中所有节点的列表,这些列表将用作工作节点。我的是
hadoop2
和
hadoop3
。类似于
HDFS
配置,编辑文件
conf / slaves
并输入每个辅助节点的
IP
/主机名。每个工作节点稍后都将运行
TaskManager
。
hadoop2
hadoop3
以上示例说明了具有三个节点(主机名
hadoop1
作为
master
,
hadoop2
和
hadoop3
作为
worker
)的设置,并显示了配置文件的内容。
Flink
目录必须在同一路径下的每个工作线程上都可用。您可以使用共享的
NFS
(网络文件系统)目录,也可以将整个
Flink
目录复制到每个工作节点。特别是:
1、每个
JobManager
的可用内存量
jobmanager.heap.size
;
2、每个
TaskManager
的可用内存量(
taskmanager.memory.process.size
并查看内存设置指南);
3、每台计算机可用的
CPU
数(
taskmanager.numberOfTaskSlots
);
4、集群中的
CPU
总数(
parallelism.default
);
5、临时目录(
io.tmp.dirs
);
【4】在
master
上启动集群(第一行)以及执行结果。下面的脚本在本地节点上启动
JobManager
,并通过
SSH
连接到
slaves
文件中列出的所有辅助节点,以在每个节点上启动
TaskManager
。现在,您的 Flink系统已启动并正在运行。现在,在本地节点上运行的
JobManager
将在配置的
RPC
端口上接受作业。要停止
Flink
,还有一个
stop-cluster.sh
脚本。
[root@hadoop1 flink-1.10.1]# bin/start-cluster.sh
Startingcluster.
Starting standalonesession daemon on host hadoop1.
Starting taskexecutor daemon on host hadoop2.
Starting taskexecutor daemon on host hadoop3.
【5】
Flink
界面展示 :进入
8081
端口,例如:
http://hadoop1:8081/
或者通过
jps
命令查看服务也可行。
Standalone
集群架构展示:
client
客户端提交任务给
JobManager
,
JobManager
负责
Flink
集群计算资源管理,并分发任务给
TaskManager
执行,
TaskManager
定期向
JobManager
汇报状态。
运行 flink示例程序
批处理示例:提交
Flink
的批处理
examples
程序:也可以在页面中进行提交,但是作为一名
NB
的程序员就使用命令
[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
执行上面的命令后,就会显示如下信息,这是
Flink
提供的
examples
下的批处理例子程序,统计单词个数。
[root@hadoop1 flink-1.10.1]# bin/flink run examples/batch/WordCount.jar
ExecutingWordCount example withdefault input data set.
Use--input tospecify file input.
Printing result tostdout.Use--output tospecify output path.
Job has been submitted withJobID99f4c579947a66884ec269ddf5f5b0ed
Program execution finished
JobwithJobID99f4c579947a66884ec269ddf5f5b0ed has finished.
JobRuntime:795 ms
AccumulatorResults:- b70332353f355cf0464b0eba21f61075 (java.util.ArrayList)[170 elements](a,5)(action,1)(after,1)(against,1)(all,2)(and,12)(arms,1)(arrows,1)(awry,1)(ay,1)(bare,1)(be,4)(bear,3)(bodkin,1)(bourn,1)(but,1)(by,2)(calamity,1)(cast,1)(coil,1)(come,1)(conscience,1)(consummation,1)(contumely,1)(country,1)(cowards,1)(currents,1)......
得到结果,这里统计的是默认的数据集,可以通过
--input --output
指定输入输出。我们可以在页面中查看运行的情况:
流处理示例:启动
nc
服务器:
[root@hadoop1 flink-1.10.1]# nc -lk 9000
提交
Flink
的批处理
examples
程序:
[root@hadoop1 flink-1.10.1]# bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname hadoop1 --port 9000
这是
Flink
提供的
examples
下的流处理例子程序,接收
socket
数据传入,统计单词个数。在
nc
端随意写入单词
[root@hadoop1 flink-1.10.1]# nc -lk 9000
g
s
进入
slave
节点(
hadoop2
,
hadoop3
),进入
Flink
安装目录输入如下命令,查看实时数据变化
[root@hadoop2 flink-1.10.1]# tail -f log/flink-*-taskexecutor-*.out
s :1:2
w :1
d :1
g :1
d :1
停止
Flink
[root@hadoop1 flink-1.10.1]# bin/stop-cluster.sh
在
Flink
的
web
中查看运行的
job
将 JobManager / TaskManager 实例添加到集群(扩展)
您可以使用
bin/jobmanager.sh
和
bin/taskmanager.sh
脚本将
JobManager
和
TaskManager
实例添加到正在运行的集群中。添加
JobManager
(确保在要启动/停止相应实例的主机上调用这些脚本)
[root@hadoop1 flink-1.10.1]# bin/jobmanager.sh ((start|start-foreground)[host][webui-port])|stop|stop-all
添加任务管理器
[root@hadoop1 flink-1.10.1]# bin/taskmanager.sh start|start-foreground|stop|stop-all
YARN模式
在企业中,经常需要将
Flink
集群部署到
YARN
,因为可以使用
YARN
来管理所有计算资源。而且
Spark
程序也可以部署到
YARN
上。
CliFrontend
是所有
job
的入口类,通过解析传递的参数(
jar
包,
mainClass
等),读取
flink
的环境,配置信息等,封装成
PackagedProgram
,最终通过
ClusterClient
提交给
Flink
集群。
Flink
运行在
YARN
上,提供了两种方式:
第一种使用
yarn-session
模式来快速提交作业到
YARN
集群。如下,在
Yarn
中初始化一个
flink
集群,开辟指定的资源,以后提交任务都向这里提交,这个
flink
集群会常驻在
Yarn
集群中,除非手动停止。共享
Dispatcher
与
ResourceManager
,共享资源。有大量的小作业,适合使用这种方式;
YarnSessionClusterEntrypoint
是
Flink
在
Yarn
上的线程。
ApplicationMaster
是
JobManager
。
YarnTaskExecutorRunner
负责接收
subTask
并运行,是
TaskManager
。
【1】修改
Hadoop
的
etc/hadoop/yarn-site.xml
,添加该配置表示内存超过分配值,是否将任务杀掉。默认为
true
。运行
Flink
程序,很容易超过分配的内存。
<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>
【2】 添加环境变量
//查看是否配置HADOOP_CONF_DIR,我这里没有配置输出为空[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR//在系统变量中添加 HADOOP_CONF_DIR[root@hadoop1 hadoop-2.7.2]# vim /etc/profile
//添加如下内容,wq保存退出
export HADOOP_CONF_DIR=$HADOOP_HOME/conf///刷新 /etc/profile[root@hadoop1 hadoop-2.7.2]# source /etc/profile
//重新查看是否配置HADOOP_CONF_DIR[root@hadoop1 hadoop-2.7.2]# echo $HADOOP_CONF_DIR/opt/module/hadoop-2.7.2/conf/
【3】启动
HDFS
、
YARN
集群。通过
jps
查看启动状况。关闭
flink
的其他集群。
[root@hadoop1 hadoop-2.7.2]# sbin/start-all.sh
[root@hadoop2 hadoop-2.7.2]# jps
10642NodeManager11093Jps10838ResourceManager10535DataNode10168TaskManagerRunner
【4】将官方指定Pre-bundled Hadoop 2.7.5包放到
flink
的
lib
目录下。使用
yarn-session
模式提交作业
使用
Flink
中的
yarn-session
(
yarn
客户端),会启动两个必要服务
JobManager
和
TaskManagers
;
客户端通过
yarn-session
提交作业;
yarn-session
会一直启动,不停地接收客户端提交的作用。
-n 表示申请2个容器
-s 表示每个容器启动多少个slot
-tm 表示每个TaskManager申请800M内存
-nm yarn 的 appName,
-d detached表示以后台程序方式运行
如下表示启动一个
yarn session
集群,每个
JM
为
1G
,
TM
的内存是
1G
。
[root@hadoop1 flink-1.10.1]# bin/yarn-session.sh -n 2 -jm 1024m -tm 1024m -d
客户端默认是
attach
模式,不会退出 。可以
ctrl+c
退出,然后再通过如下命令连上来。或者启动的时候用
-d
则为
detached
模式
./bin/yarn-session.sh -id application_1594027553009_0001(这个id来自下面hadoop集群)
Yarn
上显示为
Flink session cluster
,一致处于运行状态。
点击
ApplicationMaster
就会进入
Flink
集群
启动命令行中也会显示如下的
JobManager
启动的
Web
界面
JobManagerWebInterface: http://hadoop1:34431
然后我们可以通过
jps
来看下当前的进程,其中
YarnSessionClusterEntrypoint
就是我们
Yarn Session
的分布式集群。
[root@hadoop1 flink-1.10.1]# jps
69923NodeManager81267Jps69394NameNode69531DataNode80571FlinkYarnSessionCli80765YarnSessionClusterEntrypoint
/tmp
下生成了一个文件
将
Flink
应用部署到
Flink On Yarn 之 session
方式中。
[root@hadoop1 flink-1.10.1]# bin/flink run -d examples/streaming/WordCount.jar
查看运行结果:
Flink On Yarn
之
session
部署方式集群停止:关闭
Yarn
就会关闭
Flink
集群。。。
第二种模式:使用
Per-JOBYarn
分离模式(与当前客户端无关,当客户端提交完任务就结束,不用等到
Flink
应用执行完毕)提交作业:每次提交都会创建一个新的
flink
集群,任务之间相互独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。 直接提交任务给
YARN
,独享
Dispatcher
与
ResourceManager
。按需要申请资源。适合执行时间较长的大作业。
AM
启动类是
YarnJobClusterEntrypoint
。
YarnTaskExecutorRunner
负责接收
subTask
,就是
TaskManager
。需要打开
hadoop
和
yarn
分布式集群。不需要启动
flink
分布式集群,它会自动启动
flink
分布式集群。
[root@hadoop1 flink-1.10.1]# bin/flink run -m yarn-cluster -d ./examples/streaming/WordCount.jar
2020-07-1303:21:50,479WARNorg.apache.flink.yarn.cli.FlinkYarnSessionCli-The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want touse logback, then please delete or rename the log configuration file.2020-07-1303:21:50,479WARNorg.apache.flink.yarn.cli.FlinkYarnSessionCli-The configuration directory ('/usr/local/soft/flink-1.10.1/conf') already contains a LOG4J config file.If you want touse logback, then please delete or rename the log configuration file.
ExecutingWordCount example withdefault input data set.
Use--input tospecify file input.
Printing result tostdout.Use--output tospecify output path.2020-07-1303:21:50,707INFOorg.apache.hadoop.yarn.client.RMProxy-ConnectingtoResourceManager at hadoop2/192.168.52.129:80322020-07-1303:21:50,791INFOorg.apache.flink.yarn.YarnClusterDescriptor-No path for the flink jar passed. Using the location of classorg.apache.flink.yarn.YarnClusterDescriptortolocate the jar
2020-07-1303:21:50,928WARNorg.apache.flink.yarn.YarnClusterDescriptor-Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. TheFlinkYARNClient needs one of these tobe set toproperly load the Hadoop configuration for accessing YARN.2020-07-1303:21:51,001INFOorg.apache.flink.yarn.YarnClusterDescriptor-Cluster specification:ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1728, slotsPerTaskManager=1}2020-07-1303:21:53,906INFOorg.apache.flink.yarn.YarnClusterDescriptor
-yn
:
yarncontainer
表示
TaskManager
的个数;
-yqu
:
yarnqueue
指定
yarn
的队列;
-ys
:
yarnslots
每一个
TaskManager
对应的
slot
个数;
上传成功之后,我们可以在
Hadoop
的图形化界面:http://hadoop2:8088/cluster/apps 中看到当前任务的信息;
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。