Flink任务开发:从代码编写到集群提交
一、引言
Apache Flink是一个分布式流批一体化处理引擎,在大数据处理领域应用广泛。本文将详细介绍如何开发Flink任务,包括使用DataStream API进行编码、打包并提交到集群上运行,以及提交任务的两种方式。
二、Flink编码步骤/模型
Flink任务的开发通常遵循以下几个步骤:
- env - 准备环境:创建
StreamExecutionEnvironment
对象,这是Flink程序的基础环境,用于设置运行时参数、获取数据源等操作。例如:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
可以设置运行模式,如
STREAMING
(流处理,默认模式)、
BATCH
(批处理)或
AUTOMATIC
(根据数据自动判断)。
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
还可以获取或设置任务的并行度,并行度决定了任务在集群中执行的并行程度。
// 设置并行度为2
env.setParallelism(2);// 获取系统的并行度int parallelism = env.getParallelism();
- source - 加载数据:从各种数据源读取数据,如文件、Kafka、Socket等。这里以从元素列表读取数据为例:
DataStream<String> dataStream01 = env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");
也可以从文件读取数据,并且可以通过外部参数指定文件路径:
DataStream<String> dataStream =null;if(args.length!=0){String path;ParameterTool parameterTool =ParameterTool.fromArgs(args);if(parameterTool.has("input")){
path = parameterTool.get("input");}else{
path = args[0];}
dataStream = env.readTextFile(path);}else{
dataStream = env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");}
- transformation - 数据处理转换:对读取的数据进行各种转换操作,如
map
、flatMap
、filter
、keyBy
、sum
等。例如:
DataStream<String> flatMapStream = dataStream01.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String line,Collector<String> collector)throwsException{String[] arr = line.split(" ");for(String word : arr){
collector.collect(word);}}});DataStream<Tuple2<String,Integer>> mapStream = flatMapStream.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String word)throwsException{returnTuple2.of(word,1);}});DataStream<Tuple2<String,Integer>> sumResult = mapStream.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){@OverridepublicStringgetKey(Tuple2<String,Integer> tuple2)throwsException{return tuple2.f0;}}).sum(1);
也可以使用Lambda表达式进行简洁的函数式编程,但使用Lambda表达式后需要添加
returns
指定返回类型,否则可能报错。例如:
DataStream<String> wordsDS = dataStream.flatMap((String value,Collector<String> out)->{String[] words = value.split(" ");for(String word : words){
out.collect(word);}}).returns(Types.STRING);DataStream<Tuple2<String,Integer>> wordAndOneDS = wordsDS.map((String value)->Tuple2.of(value,1)).returns(Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyedDS = wordAndOneDS.keyBy((Tuple2<String,Integer> value)-> value.f0);SingleOutputStreamOperator<Tuple2<String,Integer>> result = keyedDS.sum(1);
- sink - 数据输出:将处理后的数据输出到各种目标,如文件、Kafka、控制台等。这里以打印到控制台为例:
sumResult.print();
- execute - 执行:启动Flink任务的执行。
env.execute();
三、DataStream API开发
- 添加依赖:在Maven项目中添加以下依赖:
<properties><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>
- 编写代码:按照上述编码步骤编写具体的业务逻辑代码,如WordCount示例代码。
四、打包、上传与提交任务
- 打包:使用Maven命令
mvn clean package
对项目进行打包,生成可执行的JAR文件。 - 上传:通过配置
wagon-maven-plugin
,可以在打包时将JAR文件上传到指定的远程服务器路径。 - 提交任务: - 以UI的方式递交:在Flink集群的Web界面中,上传打包好的JAR文件,并配置相关参数,如主类名、运行参数等,然后启动任务。- 以命令的方式递交: - 带有自定义
input
参数的提交方式:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input /home/wc.txt
- 带有运行模式的提交方式:
flink run -Dexecution.runtime-mode=BATCH -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input hdfs://bigdata01:9820/home/wc.txt
五、总结
通过本文的介绍,我们了解了Flink任务开发的基本流程,包括使用DataStream API进行编码、打包上传以及提交任务的两种方式。在实际应用中,可以根据具体的业务需求,灵活运用Flink的各种功能和特性,构建高效、可靠的大数据处理应用。同时,需要注意Flink版本的兼容性以及相关依赖的管理,以确保任务的顺利开发和运行。
版权归原作者 自节码 所有, 如有侵权,请联系我们删除。