0


docker 搭建 flink 并上传任务

文章目录

一、docker 搭建 flink

1、选择合适的 flink 版本

docker 安装就不介绍了,去 dockerHub 搜索 flink 镜像,选择合适的版本安装 https://hub.docker.com/_/flink/tags

使用 docker 命令

docker pull flink: 1.16.0-scala_2.12-java8

拉去镜像
在这里插入图片描述

1.16.0-scala_2.12-java8 镜像版本说明,flink 1.16.0,flink 内置 scala 版本 2.12,Java 版本 8

建议先简单启动 flink 容器 JobManager、TaskManager 两个容器将配置文件复制出来方便挂载

# 创建 docker 网络,方便 JobManager 和 TaskManager 内部访问docker network create flink-network

# 创建 JobManager docker run \-itd\--name=jobmanager \--publish8081:8081 \--network flink-network \--envFLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"\
  flink:1.16.0-scala_2.12-java8 jobmanager 
  
# 创建 TaskManager docker run \-itd\--name=taskmanager \--network flink-network \--envFLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"\
  flink:1.16.0-scala_2.12-java8 taskmanager 

启动成功
在这里插入图片描述
访问 8081 端口如下
在这里插入图片描述
copy 配置文件

# jobmanager 容器dockercp jobmanager:/opt/flink/conf ./JobManager/
# taskmanager 容器dockercp taskmanager:/opt/flink/conf ./TaskManager/
2、重新创建 JobManager、TaskManager 容器并挂载配置文件

修改 JobManager/conf/flink-conf.yaml web 端口号为 18081
在这里插入图片描述

修改 TaskManager/conf/flink-conf.yaml 容器任务槽为 5
在这里插入图片描述
启动容器挂载配置文件

# 启动 jobmanager   docker run -itd-v /root/docker/flink/JobManager/conf/:/opt/flink/conf/ --name=jobmanager --publish18081:18081 --envFLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"--network flink-network flink:1.16.0-scala_2.12-java8 jobmanager
# 启动 taskmanager   docker run -itd-v /root/docker/flink/TaskManager/conf/:/opt/flink/conf/ --name=taskmanager --network flink-network --envFLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"  flink:1.16.0-scala_2.12-java8 taskmanager

参数解释

  • FLINK_PROPERTIES=“jobmanager.rpc.address: jobmanager” rpc 地址,必须设置,负责 jobmanager 和 taskmanager 的 rpc 地址都是随机生成,会连接不上,当然你也可以在直接修改配置文件 flink-conf.yaml

如下两个容器启动成功,可以看到 web 端口为 18081,taskmanager 启动一个,包含 5 个任务槽
在这里插入图片描述

二、flink 简单示例

官网参考地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/configuration/overview/#getting-started

1、创建项目架构

使用 maven 命令指定原型 Flink Maven Archetype 快速创建一个包含了必要依赖的 Flink 程序骨架,自定义项目 groupId、artifactId、package 等信息

mvn archetype:generate ^
  -DarchetypeGroupId=org.apache.flink ^
  -DarchetypeArtifactId=flink-quickstart-java ^
  -DarchetypeVersion=1.16.0    ^
  -DgroupId=com.ye ^
  -DartifactId=flink-study ^
  -Dversion=0.1 ^
  -Dpackage=com.ye ^
  -DinteractiveMode=false

下载成功打开项目目录

在这里插入图片描述
如下:注意运行需要设置启动参数,否则启动会找不到类,因为 pom.xml 文件 flink 相关包都添加了

<scope>provided</scope>

表示只用于生产环境,另一种方法就是将

<scope>provided</scope>

修改为

<scope>runtime</scope>

在这里插入图片描述

流处理和批处理在 flink 低版本(貌似1.12)需要区分,目前都使用流处理写法

2、批处理简单示例

下面代码用来统计单词出现的的次数

publicclassDataBatchJob{/* 下面示例统计单词出现的次数 */publicstaticvoidmain(String[] args)throwsException{// 获取 flink 环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 添加数据源DataStreamSource<String> streamSource = env.fromElements("hello world","hello flink","flink","hello","world");// 对传入的流数据分组SingleOutputStreamOperator<Tuple2<String,Integer>> streamOperator = streamSource.flatMap(newFlatMapFunction<String,Tuple2<String,Integer>>(){// value 传入的数据,out// Tuple2 二元组// out 传出的值@OverridepublicvoidflatMap(String value,Collector<Tuple2<String,Integer>> out)throwsException{String[] split = value.split(" ");for(String s : split){
                    out.collect(Tuple2.of(s,1));}}});// 按二元组的第 0 个位置分组KeyedStream<Tuple2<String,Integer>,Tuple> keyBy = streamOperator.keyBy(0);// 按二元组的第 1 个位置求和SingleOutputStreamOperator<Tuple2<String,Integer>> sum = keyBy.sum(1);
        sum.print();
        env.execute("统计单词出现的次数");}}

执行结果如下
在这里插入图片描述
上传 flink 集群

3、流处理简单示例

下面示例通过 socket 文本源,对输入的大于 500 和小于 500 的分别求和

publicclassDataStreamJob{privatestaticfinalLogger logger =LoggerFactory.getLogger(DataStreamJob.class);/* 下面示例对大于 500 和小于 500 的分别求和 */publicstaticvoidmain(String[] args)throwsException{// 获取 flink 环境finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 添加 socket 文本流数据源//DataStreamSource<String> streamSource = env.fromElements("200", "100", "6000", "500", "2000", "300", "1500", "900");DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1",7777);// 对大于 500 和小于 500 进行分组KeyedStream<String,String> stringKeyedStream = streamSource.keyBy(newKeySelector<String,String>(){@OverridepublicStringgetKey(String s)throwsException{int i =Integer.parseInt(s);return i >500?"ge":"lt";}});// 开 10 秒滚动窗口,每 10 秒为一批数据 【00:00:00 ~ 00:00:10)、【00:00:10 ~ 00:00:20)左闭右开区间WindowedStream<String,String,TimeWindow> windowedStream = stringKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 窗口处理函数,泛型 String, Integer, String, TimeWindow 依次对应 输入类型、输出类型、 KEY类型(即keyBy 返回的类型), 窗口SingleOutputStreamOperator<Integer> outputStreamOperator = windowedStream.process(newProcessWindowFunction<String,Integer,String,TimeWindow>(){/*
            * key: 分组的 key
            * context: 上下文信息
            * elements: 传过来的一批数据
            * out: 数据输出
            * */@Overridepublicvoidprocess(String key,ProcessWindowFunction<String,Integer,String,TimeWindow>.Context context,Iterable<String> elements,Collector<Integer> out)throwsException{System.out.println(key);AtomicInteger sum =newAtomicInteger();
                elements.forEach(item -> sum.addAndGet(Integer.parseInt(item)));
                out.collect(sum.get());}});// 输出
        outputStreamOperator.print();
        env.execute("分组求和");}}

在 window 或 Linux 开启 Socket 文本流测试
在这里插入图片描述

4、上传 flink 集群

打包项目:可以在 pom.xml 修改启动类,也可以在命令启动或者 ui 界面上传设置启动类参数
在这里插入图片描述

①、UI 界面提交任务

使用 ui 界面上传 jar 到 flink 集群,点击 submit 运行

在这里插入图片描述

②、命令提交任务
# 如果集群( 即JobManager) 在当前服务器可以使用如下命令
    $ bin/flink run -Dexecution.runtime-mode=BATCH <jarFile># 如果集群( 即JobManager) 不在当前服务器,在 TaskManager 服务器提交作业可以使用如下命令# -m 指定 JobManager 服务器地址# -c 指定作业入口程序# -p 指定并行度
    $ bin/flink run -m192.168.1.1:8081 -c com.ye.StreamWordCount -p2<jarFile># 撤销任务    
    $ bin/flink cancle <jobId>
5、web-ui 提交查看撤销任务

批处理运行完成在这里插入图片描述
流处理正在运行
在这里插入图片描述

三、待解决

使用 docker 启动的 flink 集群发现 UI 界面的 stdout 没有 print 输出
在这里插入图片描述

标签: docker flink java

本文转载自: https://blog.csdn.net/qq_41538097/article/details/129113866
版权归原作者 不懂一休 所有, 如有侵权,请联系我们删除。

“docker 搭建 flink 并上传任务”的评论:

还没有评论