0


创建第一个 Flink 项目

一、运行环境介绍

Flink

执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助

Hadoop Yarn

k8s

Mesos

等不同的资源管理器部署自己的应用。

环境依赖:
【1】

JDK

环境:

Flink

核心模块均使用 Java开发,所以运行环境需要依赖

JDK

JDK

版本需要保证在

1.8

以上。
【2】

Maven

编译环境:

Flink

的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过

IDE

开发

Flink Application

,则建议使用

Maven

作为项目工程编译方式。需要注意的是,

Flink

程序需要

Maven

的版本在

3.0.4

及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】

IDEA

:需要安装

scala

插件以及

scala

环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建

Maven

项目,

pom.xml

文件中配置如下依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.10.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.10.0</version></dependency></dependencies><build><plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><!--声明绑定到 maven 的compile阶段--><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

【2】

resource

目录中添加需要进行统计的文件文件及内容
[点击并拖拽以移动] ​

【3】

WordCount.java

文件内容如下,需要注意隐私转换问题,需要引入

scala._
importorg.apache.flink.api.scala._

/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount{
  def main(args:Array[String]):Unit={//创建一个批处理的执行环境
    val env:ExecutionEnvironment=ExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据var inputDateSet:DataSet[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group by
    val resultDataSet:DataSet[(String,Int)]= inputDateSet
      .flatMap(_.split(" "))//分词得到所有 word构成的数据集.map((_,1))//_表示当前 word 转换成一个二元组(word,count).groupBy(0)//以二元组中第一个元素作为key.sum(1)//1表示聚合二元组的第二个元素的值//打印输出
    resultDataSet.print()}}

【4】统计结果展示:
[点击并拖拽以移动] ​

三、Flink项目 Scala版 DataStream 无界流

【1】

StreamWordCount.java

文件内容如下

packagecom.zzx.flinkimportorg.apache.flink.streaming.api.scala._

object StreamWordCount{
 def main(args:Array[String]):Unit={// 创建一个流处理执行环境
   val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流
   val inputDataStream:DataStream[String]= env.socketTextStream("hadoop1",6666);//定义转换操作 word count
   val resultDataStream:DataStream[(String,Int)]= inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和

   resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")}}

【2】我这里在

Hadoop1

中通过

nc -lk xxx

打开一个

socket

通信
点击并拖拽以移动​

【3】查看

IDEA

输出统计内容如下:输出

word

的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过

env.setParallelism

进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
[点击并拖拽以移动] ​

【4】当我们需要将

Java

文件打包上传到

Flink

的时候,这里的

host

port

可以从参数中进行获取,代码修改如下:

packagecom.zzx.flinkimportorg.apache.flink.api.java.utils.ParameterToolimportorg.apache.flink.streaming.api.scala._

object StreamWordCount{
 def main(args:Array[String]):Unit={// 创建一个流处理执行环境
   val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
   // 接受 socket 文本流  hostname:prot 从程序运行参数中读取
   val params:ParameterTool=ParameterTool.fromArgs(args);
   val hostname:String= params.get("host");
   val port:Int= params.getInt("port");
   val inputDataStream:DataStream[String]= env.socketTextStream(hostname,port);//定义转换操作 word count
   val resultDataStream:DataStream[(String,Int)]= inputDataStream
     .flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和

   resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
   env.execute("stream word count word")}}
标签: flink 大数据 算法

本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/134866997
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“创建第一个 Flink 项目”的评论:

还没有评论