前言
Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去。详细的介绍我本篇就不做阐述了,感兴趣的同学可以回复往期的文章:Flink 系列二 Flink 状态化流处理概述,Flink 系列一 开发机 安装。本篇作为 Flink 系列的第三篇,咱们尝试在本地安装和实操 Flink。
1、安装flink环境
首先需要在你的本地环境安装apache-flink,执行如下命令即可,若采用docker安装更加方便。
brew install apache-flink
2、在idea中创建flink的第一个demo
2.1、执行如下maven命令
执行如下命令创建工程。这个命令的作用是使用Maven构建一个基于Apache Flink的Java快速启动项目模板。执行完后会下载对应的依赖包。
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.8.0 \
-DarchetypeCatalog=local
解释一下具体含义:
mvn
是Maven的命令行工具。archetype:generate
表示使用原型机模板生成一个新项目。-DarchetypeGroupId
指定了项目模板的组ID,即Apache Flink团队为项目提供的默认模板组ID。-DarchetypeArtifactId
指定了项目模板的Artifact ID,即Apache Flink团队为项目提供的默认模板Artifact ID。-DarchetypeVersion
指定了项目模板的版本号。-DarchetypeCatalog
指定了本地的模板目录。- 反斜杠()是命令的折行符,它表示这个命令是连续的,但是出于格式上的考虑需要分成多行。
2.2、填写'groupId'、'artifactId'、'version'、'package'
Define value for property 'groupId': com.lly.flink.java
Define value for property 'artifactId': flink-traning
Define value for property 'version' 1.0-SNAPSHOT: : 1.0.0
Define value for property 'package' com.lly.flink.java: :
Confirm properties configuration:
groupId: com.lly.flink.java
artifactId: flink-traning
version: 1.0.0
package: com.lly.flink.java
2.3、选择Yes即可生成创建好的工程
特别注意,这里一定要选择 “Y”,保证项目顺利生产。
Y: : Y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: flink-quickstart-java:1.8.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com.lly.flink.java
[INFO] Parameter: artifactId, Value: flink-traning
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: package, Value: com.lly.flink.java
[INFO] Parameter: packageInPathFormat, Value: com/lly/flink/java
[INFO] Parameter: package, Value: com.lly.flink.java
[INFO] Parameter: version, Value: 1.0.0
[INFO] Parameter: groupId, Value: com.lly.flink.java
[INFO] Parameter: artifactId, Value: flink-traning
[WARNING] CP Don't override file /Users/liluyang/flink-traning/src/main/resources
[INFO] Project created from Archetype in dir: /Users/liluyang/flink-traning
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:17 min
[INFO] Finished at: 2020-11-05T12:42:42+08:00
[INFO] ------------------------------------------------------------------------
3、开发第一个flink程序
3.1、开发一个简单的统计程序
package com.lly.flink.java;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author lly
* @date 2020-11-05
**/
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
//参数检查
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
return;
}
String hostname = args[0];
Integer port = Integer.parseInt(args[1]);
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据
DataStreamSource<String> stream = env.socketTextStream(hostname, port);
//计数
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
sum.print();
env.execute("Java WordCount from SocketTextStream Example");
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
我这里简单解释一些这段代码,希望刚开始学习的同学可以理解的更深刻。这段代码使用 Flink 实现了通过网络流读取数据,统计单词出现次数的功能。具体实现细节如下:
- 声明一个
SocketTextStreamWordCount
类,定义该类的main
方法作为程序的入口。 - 在
main
方法中,首先对传入的命令行参数进行检查,如果参数个数不为 2,则输出使用说明后直接返回。 - 然后获取主机地址和端口号,用于后续建立套接字连接。
- 接着创建 Flink 流处理的环境对象
StreamExecutionEnvironment
,用于设置执行环境和创建数据流。 - 调用
socketTextStream
方法,获取一个DataStreamSource<String>
对象,用于从套接字连接中获取数据流。 - 对获取的数据流进行
flatMap
操作,使用LineSplitter
类作为转换器将每行文本数据切分成单词,并将单词转化为"单词,1"
的元组格式,用于后续统计。 - 对转换后的数据流使用
keyBy
方法,按照第一个字段(即单词)进行分组。 - 对分组后的数据使用
sum
方法,对第二个字段(即出现次数)进行求和,返回一个SingleOutputStreamOperator<Tuple2<String, Integer>>
类型的结果流。 - 最后调用
print
方法,将结果打印输出到控制台。 - 最后调用
execute
方法,传入一个字符串 "Java WordCount from SocketTextStream Example" 作为任务名称,开始执行整个 Flink 应用程序。 - 声明了一个静态内部类
LineSplitter
,实现了 Flink 的FlatMapFunction
接口,并重写了flatMap
方法。该方法将输入的文本行按照非单词字符(如空格、逗号等)进行切分,并将每个单词转化为一个元组,其中第一个字段为单词,第二个字段为 1,表示该单词出现了 1 次。
3.2、直接编译得到jar包
4、启动环境
4.1、启动已经下载好的flink环境
flink run -c 业务类包路径 jar包路径 IP 端口 示例
flink run -c 业务类包路径 jar包路径 IP 端口
示例:
flink run -c com.lly.flink.SocketTextStreamWordCount /Users/liluyang/flink-traning/target/original-flink-traning-1.0.0.jar 127.0.0.1 9000
启动成功之后会生成Job ID
Job has been submitted with JobID b04bad9f4c05efd67344179ee676b513
启动成功之后访问:http://localhost:8081/就可以直接当问flink的的操作后台,操作后台可以直观的看到Job的执行情况和基本的操作
4.2、创建一个服务端的Tcp 监听
创建一个server监听并接受链接
nc -l 9000
4.3、打开计算日志
cd /usr/local/Cellar/apache-flink/1.10.0/libexec/log
4.4、在建立nc监听端口中输入text
liluyang@liluyangdeMacBook-Pro ~ % nc -l 9000
cda
cda
dsas
assgasg
nihao
nihao
nihao
nihao
1
1
1
1
1
1
1
4.5、在输出日志中就有统计
liluyang@liluyangdeMacBook-Pro log % tail -100f flink-liluyang-taskexecutor-0-liluyangdeMacBook-Pro.local.out
(cda,1)
(cda,2)
(dsas,1)
(assgasg,1)
(nihao,1)
(nihao,2)
(nihao,3)
(nihao,4)
(1,1)
(1,2)
(1,3)
(1,4)
(1,5)
(1,6)
至此:Mac 电脑上安装 Flink,并且运行它。接着通过一个简单的 Flink 程序来介绍如何构建及运行Flink 程序。
版权归原作者 阳仔的屁仔 所有, 如有侵权,请联系我们删除。