Flink集群搭建
⚠申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址。 全文共计3696字,阅读大概需要3分钟
🌈更多学习内容, 欢迎👏关注👀【文末】我的个人微信公众号:不懂开发的程序猿
个人网站:https://jerry-jy.co/
一、实验目的
掌握Flink集群搭建的过程。
掌握Flink集群的启动、停止、提交作业
二、实验内容
1、搭建Flink集群
2、集群的启动,停止、提交作业
三、实验原理
Flink提交作业和执行任务,需要几个关键组件:
- 客户端(Client) : 代码由客户端获取并做转换,之后提交给JobManger
- JobManager就是Flink集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager 。
- TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的。
四、实验环境
硬件:x86_64 CentOS 7.5 服务器
软件:JDK1.8,Flink-1.17.1,Hadoop-3.3.3,IntelliJ Idea-2022
五、实验步骤
5.1 集群启动
0)集群规划
安装部署
1)下载并解压安装包
(1)下载安装包
flink-1.17.1-bin-scala_2.12.tgz
,将该jar包上传到hadoop102节点服务器的
/opt/software
路径上。
安装包下载地址:https://flink.apache.org/zh/downloads/
(2)在/opt/software路径上解压
flink-1.17.0-bin-scala_2.12.tgz
到
/opt/module
路径上。
[root@hadoop102 software]$ tar-zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
2)修改集群配置
(1)进入conf路径,修改
flink-conf.yaml
文件,指定hadoop102节点服务器为
JobManager
[root@hadoop102 conf]$ vim flink-conf.yaml
修改如下内容:
# JobManager节点地址.
jobmanager.rpc.address: hadoop102
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop102
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop102
(2)修改workers文件,指定hadoop102、hadoop103和hadoop104为TaskManager
[root@hadoop102 conf]$ vim workers
修改如下内容:
hadoop102
hadoop103
hadoop104
(3)修改masters文件
[root@hadoop102 conf]$ vim masters
修改如下内容:
hadoop102:8081
(4)另外,在
flink-conf.yaml
文件中还可以对集群中的
JobManager
和
TaskManager
组件进行优化配置,主要配置项如下:
- jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
- taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。
- taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
- parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
3)分发安装目录
(1)配置修改完毕后,将Flink安装目录发给另外两个节点服务器。
[root@hadoop102module]$ xsync flink-1.17.1/
(2)修改hadoop103的
taskmanager.host
[root@hadoop103 conf]$ vim flink-conf.yaml
修改如下内容:
# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: hadoop103
(3)修改hadoop104的 taskmanager.host
[root@hadoop104 conf]$ vim flink-conf.yaml
修改如下内容:
# TaskManager节点地址.需要配置为当前机器名
taskmanager.host: hadoop104
4)启动集群
(1)在hadoop102节点服务器上执行
start-cluster.sh
启动Flink集群:
[root@hadoop102 flink-1.17.1]# bin/start-cluster.sh
(2)查看进程情况:
[root@hadoop102 flink-1.17.1]# jpsall
=============== hadoop102 ===============3024Jps2467StandaloneSessionClusterEntrypoint2869TaskManagerRunner=============== hadoop103 ===============2059TaskManagerRunner2159Jps=============== hadoop104 ===============2038TaskManagerRunner2138Jps
5)访问Web UI
启动成功后,同样可以访问
http://hadoop102:8081
对flink集群和任务进行监控管理。
这里可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3。
5.2 向集群提交作业
在上一节中,我们已经编写读取socket发送的单词并统计单词的个数程序案例。
本节我们将以该程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。
1)环境准备
在hadoop102中执行以下命令启动
netcat
。
[root@hadoop102 flink-1.17.1]# nc -lk 7777
2)程序打包
(1)在我们编写的Flink入门程序的
pom.xml
文件中添加打包插件的配置,具体如下:
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformerscombine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
(2)插件配置完毕后,可以使用IDEA的Maven工具执行
clean
package
命令,出现如下提示即表示打包成功。
打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,
FlinkTutorial-1.0-SNAPSHOT.jar
和
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar
,因为集群中已经具备任务运行所需的所有依赖,所以建议使用
FlinkTutorial-1.0-SNAPSHOT.jar
。
3)在Web UI上提交作业
(1)任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包,如下图所示。
JAR包上传完成,如下图所示:
(2)点击该JAR包,出现任务配置页面,进行相应配置。
主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。
(3)任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况。
(4)测试
①启动netcat
[root@hadoop102 flink-1.17.1]$ nc -lk 7777
②在socket端口中输入如下
hello java
hello flink
③先点击Task Manager,然后点击右侧的192.168.10.104服务器节点
③点击Stdout,就可以看到hello单词的统计
(4)点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行。
5.3 命令行提交作业
除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。
这里为方便起见,我们可以先把jar包直接上传到目录flink-1.17.1下
(1)首先需要启动集群。
[root@hadoop102 flink-1.17.1]$ bin/start-cluster.sh
(2)在hadoop102中执行以下命令启动netcat。
[root@hadoop102 flink-1.17.0]$ nc -lk 7777
(3)将flink程序运行jar包上传到
/opt/module/flink-1.17.1
路径。
(4)进入到flink的安装路径下,在命令行使用
flink run
命令提交作业。
[root@hadoop102 flink-1.17.1]$ bin/flink run -m hadoop102:8081-c com.atguigu.wc.SocketStreamWordCount./FlinkTutorial-1.0-SNAPSHOT.jar
这里的参数 -m指定了提交到的JobManager,-c指定了入口类。
(5)在浏览器中打开Web UI,
http://hadoop102:8081
查看应用执行情况。
用netcat输入数据,可以在TaskManager的标准输出(Stdout)看到对应的统计结果。
–end–
版权归原作者 不懂开发的程序猿 所有, 如有侵权,请联系我们删除。