0


Flink 的安装与基础编程

Flink 的安装与基础编程

实验背景

Flink是一种具有代表性的开源流处理架构,具有十分强大的功能,它实现了
Google Datatlow 流计算模型,是一种兼具高吞吐、低延迟和高性能的实时
流计算框架,并旦同时支 持批处理和流处理。Flink
的主要特性包括批流一体化、精密的状态管理、事件时间支持以
及精确一次的状态一致性保障等。

Flink 不仅可以运行在包括YARN、 Mesos、Kubernetes
等在内的多种资源管理框架上,还支持在裸机集群上独立部署。

实验目的

掌握Flink的安装以及基本编程方法。

实验环境

VirtualBox 6.1.14, Ubuntu 16.04,java8

实验任务及完成过程

安装Flink,并以WordCount程序为实例介绍Fink编程方法

本地模式下安装Flink

检查配置环境

为了运行Flink,需提前安装好 Java 8 或者 Java 11。 可以通过以下命令来检查
Java 是否已经安装正确,如果没有的话,需要先安装 JDK。:
(1)检查java版本

java -version

在这里插入图片描述

(2)在官方下载地址下载对应版本:

官网(https://flink.apache.org/downloads.html)
下载flink.1binscala_2.11.tgz

在这里插入图片描述

(3)下载完成后通过WinSCP将电脑本地本地导入到虚拟机的下载目录下。

在这里插入图片描述

(4)在虚拟机的命令行使用如下命令对安装文件进行解压缩:

cd 下载
sudo tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /usr/local
\begin{lstlisting}[language={[ANSI]C}]

配置Flink

(5)修改目录名,并设置权限:

cd /usr/local
sudo mv flink-1.9.1 flink
sudo chown -R hadoop:hadoop flink

(6)由于我们此次环境为本地模式,Flink是可以开箱直接用的,如果要修改java
运行环境,可以修改/usr/local/flink/conf/flink-conf.yaml文件中的env.java.home参数
,设置为本地java的绝对路径。

使用如下命令添加环境变量:

vim ~/.bashrc

(7)在.bashrc文件中添加如下内容:

export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH

(8)保存并退出.bashrc文件,然后执行如下命令让配置文件生效:

source ~/.bashrc

启动与判断运行情况

(9)使用如下命令启动Flink:

cd /usr/local/flink
./bin/start-cluster.sh

(10)使用jps命令查看进程:

jps
6146 Jps
5673 StandaloneSessionClusterEntrypoint
6107 TaskManagerRunner

如果能看到TaskManagerRunner和StandaloneSessionClusterEntrypoint
这两个进程,就说明启动成功。

在这里插入图片描述

或者我们可以通过Web前段来判断是否启动成功,可以在浏览器
中输入http://localhost:8081网址来访问。

在这里插入图片描述

Flink实例运行(单机)

Flink安装包自带了测试样例,这里可以运行WordCount样例程序
测试Flink的运行效果,具体命令如下:

cd /usr/local/flink/bin
./flink run /usr/local/flink/examples/batch/WordCount.jar

在这里插入图片描述

我们运行得出效果如上图所示,统计词频WordCount。

编程实现WordCount程序

编写WordCount程序主要包括以下四个步揍:

(1)安装Maven

(2)编写代码

(3)使用Maven打包java程序

(4)通过flink run命令运行程序

安装Maven

Ubuntu中没有自带安装Maven,需要手动安装Maven。
可以访问Maven官网下载安装文件,下载地址如下:

https://downloads.apache.org/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip

在这里插入图片描述

下载Maven安装文件后,保存到 /Downloads目录下。可以选择
安装在/usr/local/maven目录中,命令如下:

sudo unzip ~/下载/apache-maven-3.6.3-bin.zip -d /usr/local
cd /usr/local
sudo mv apache-maven-3.6.3/ ./maven
sudo chown -R hadoop ./maven

编写代码

在Linux终端中执行如下命令,在用户主文件夹下创建一个文件夹
flinkapp作为应用程序跟目录:

cd ~ #进入用户主文件夹
mkdir -p ./flinkapp/src/main/java

使用vim编辑器在./flinkapp/src/main/java 目录下
建立3个代码文件,即WordCountData.java、WordCountTokenizer.java、WordCount.java

WordCountData.java用于提供原始数据,内容如下:

package cn.edu.xmu;
  import org.apache.flink.api.java.DataSet;
  import org.apache.flink.api.java.ExecutionEnvironment;
  
  public class WordCountData {
      public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
      public WordCountData() {
      }
      public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
          return env.fromElements(WORDS);
      }
  }

WordCountTokenizer.java用于切分句子,内容如下:

package cn.edu.xmu;
  import org.apache.flink.api.common.functions.FlatMapFunction;
  import org.apache.flink.api.java.tuple.Tuple2;
  import org.apache.flink.util.Collector;
  
  
  public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{
  
      public WordCountTokenizer(){}
  
  
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
          String[] tokens = value.toLowerCase().split("\\W+");
          int len = tokens.length;
  
          for(int i = 0; i<len;i++){
              String tmp = tokens[i];
              if(tmp.length()>0){
                  out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
              }
          }
      }
  }

WordCount.java提供主函数,内容如下:

package cn.edu.xmu;
  import org.apache.flink.api.java.DataSet;
  import org.apache.flink.api.java.ExecutionEnvironment;
  import org.apache.flink.api.java.operators.AggregateOperator;
  import org.apache.flink.api.java.utils.ParameterTool;
  
  
  public class WordCount {
  
      public WordCount(){}
  
      public static void main(String[] args) throws Exception {
          ParameterTool params = ParameterTool.fromArgs(args);
          ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          env.getConfig().setGlobalJobParameters(params);
          Object text;
          //如果没有指定输入路径,则默认使用WordCountData中提供的数据
          if(params.has("input")){
              text = env.readTextFile(params.get("input"));
          }else{
              System.out.println("Executing WordCount example with default input data set.");
              System.out.println("Use -- input to specify file input.");
              text = WordCountData.getDefaultTextLineDataset(env);
          }
  
          AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);
          //如果没有指定输出,则默认打印到控制台
          if(params.has("output")){
              counts.writeAsCsv(params.get("output"),"\n", " ");
              env.execute();
          }else{
              System.out.println("Printing result to stdout. Use --output to specify output path.");
              counts.print();
          }
  
      }
  }

该程序依赖Flink Java API,因此,需要通过Maven进行编译打包
。需要新建文件pom.xml,在pom.xml文件中添加如下内容,用来声明该独立程序的信息以及
与Flink的依赖关系:

<project>
  <groupId>cn.edu.xmu</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <repositories>
      <repository>
          <id>jboss</id>
          <name>JBoss Repository</name>
          <url>http://repository.jboss.com/maven2/</url>
      </repository>
  </repositories>
  <dependencies>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.9.1</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.9.1</version>
      </dependency>
  </dependencies>
</project>

使用Maven打包java程序

为了保证Maven能正常运行,先执行如下命令检查整个应用程序的文件结构:

cd ~/flinkapp
find .

文件结构应该是类似如下内容:

../src
./src/main
./src/main/java
./src/main/java/WordCount.java
./src/main/java/WordCountTokenizer.java
./src/main/java/WordCountData.java
./pom.xml
  

接下来,可以通过如下代码将整个应用程序打包成JAR包:

cd ~/flinkapp    #一定把这个目录设置为当前目录
/usr/local/maven/bin/mvn package

如果屏幕返回信息中包含BUILD SUCCESS,则说明生成JAR包成功。

在这里插入图片描述

通过flink run 命令运行程序

可以将生成的JAR包通过flink run命令提交到Flink中运行
(启动flink的状态):

/usr/local/flink/bin/flink run --class cn.edu.xmu.WordCount ~/flinkapp/target/simple-project-1.0.jar

执行成功后,可以看到词频统计结果:

在这里插入图片描述

集群模式下Flink的安装与配置

Flink安装前准备工作

使用WinSCP将Flink安装包上传到虚拟机中,
可以去国内镜像下载,也可以去apache flink官网下载
flink.0binscala_2.11.tgz

Flink安装操作步骤:

、解压缩文件

tar -zxf flink-1.10.0-bin-scala_2.11.tgz -C ../soft/

、为了方便以后使用,将解压缩后的文件夹修改名字

mv flink-1.10.0/ flink

、选择性配置环境变量

vim /etc/profile
export FLINK_HOME=/opt/soft/flink
export PATH=$PATH:$FLINK_HOME/bin

、进入flink的conf目录下,修改配置文件

vim flink-conf.yaml(已注明行号)
33 jobmanager.rpc.address: slave1

、修改slaves,将集群中的另外两台添加进去

slave2
slave3

、分发文件到集群

xsync flink/
xsync /etc/profile

、启动flink 并进入到flink的bin目录下

cd /opt/soft/flink/bin
./start-cluster.sh

打开浏览器,输入slave1:8081 即可看到flink的web可视化页面。

在这里插入图片描述

测试集群模式下Flink:

1.flink集群的开启

start-cluster.sh

2.jps查看一下进程: 分别可以看到JobManager和TaskManager的2个进程

[root@slave1 bin]$ jps
3876 StandaloneSessionClusterEntrypoint
[hadoop@slave2 ~]$ jps
3544 TaskManagerRunner

3.运行如下案例代码

Scala> val text = benv.fromElements(
        "To be, or not to be,--that is the question:--",
        "Whether 'tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
        "Or to take arms against a sea of troubles,")
      Scala> val counts = text
          .flatMap { _.toLowerCase.split("\\W+") }
          .map { (_, 1) }.groupBy(0).sum(1)
      Scala> counts.print()

4.结果展示

在这里插入图片描述

5.停止flink集群

stop-cluster.sh

总结

Apache Flink是一个分布式处理引擎,用于对无界和有界数据流进行有状态计算。
Flink
以数据并行和流水线方式执行任意流数据程序,它的流水线运行时系统可以执行批
处理和流处理程序。此外,Flink 在运行时本身也支持迭代算法的执行。

本文简要介绍了(单机与集群模式)Flink的安装以及基本编程方法,与如何通过
使用Maven打包java程序后再通过虚拟机用flink语言调包使用。

标签: flink java 大数据

本文转载自: https://blog.csdn.net/weixin_52439751/article/details/129398017
版权归原作者 两个肚皮 所有, 如有侵权,请联系我们删除。

“Flink 的安装与基础编程”的评论:

还没有评论