0


Flink任务开发:从代码编写到集群提交

Flink任务开发:从代码编写到集群提交

一、引言

Apache Flink是一个分布式流批一体化处理引擎,在大数据处理领域应用广泛。本文将详细介绍如何开发Flink任务,包括使用DataStream API进行编码、打包并提交到集群上运行,以及提交任务的两种方式。

二、Flink编码步骤/模型

Flink任务的开发通常遵循以下几个步骤:

  1. env - 准备环境:创建StreamExecutionEnvironment对象,这是Flink程序的基础环境,用于设置运行时参数、获取数据源等操作。例如:
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

可以设置运行模式,如

STREAMING

(流处理,默认模式)、

BATCH

(批处理)或

AUTOMATIC

(根据数据自动判断)。

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

还可以获取或设置任务的并行度,并行度决定了任务在集群中执行的并行程度。

// 设置并行度为2
env.setParallelism(2);// 获取系统的并行度int parallelism = env.getParallelism();
  1. source - 加载数据:从各种数据源读取数据,如文件、Kafka、Socket等。这里以从元素列表读取数据为例:
DataStream<String> dataStream01 = env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");

也可以从文件读取数据,并且可以通过外部参数指定文件路径:

DataStream<String> dataStream =null;if(args.length!=0){String path;ParameterTool parameterTool =ParameterTool.fromArgs(args);if(parameterTool.has("input")){
        path = parameterTool.get("input");}else{
        path = args[0];}
    dataStream = env.readTextFile(path);}else{
    dataStream = env.fromElements("spark flink kafka","spark sqoop flink","kakfa hadoop flink");}
  1. transformation - 数据处理转换:对读取的数据进行各种转换操作,如mapflatMapfilterkeyBysum等。例如:
DataStream<String> flatMapStream = dataStream01.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String line,Collector<String> collector)throwsException{String[] arr = line.split(" ");for(String word : arr){
            collector.collect(word);}}});DataStream<Tuple2<String,Integer>> mapStream = flatMapStream.map(newMapFunction<String,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(String word)throwsException{returnTuple2.of(word,1);}});DataStream<Tuple2<String,Integer>> sumResult = mapStream.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){@OverridepublicStringgetKey(Tuple2<String,Integer> tuple2)throwsException{return tuple2.f0;}}).sum(1);

也可以使用Lambda表达式进行简洁的函数式编程,但使用Lambda表达式后需要添加

returns

指定返回类型,否则可能报错。例如:

DataStream<String> wordsDS = dataStream.flatMap((String value,Collector<String> out)->{String[] words = value.split(" ");for(String word : words){
            out.collect(word);}}).returns(Types.STRING);DataStream<Tuple2<String,Integer>> wordAndOneDS = wordsDS.map((String value)->Tuple2.of(value,1)).returns(Types.TUPLE(Types.STRING,Types.INT));KeyedStream<Tuple2<String,Integer>,String> keyedDS = wordAndOneDS.keyBy((Tuple2<String,Integer> value)-> value.f0);SingleOutputStreamOperator<Tuple2<String,Integer>> result = keyedDS.sum(1);
  1. sink - 数据输出:将处理后的数据输出到各种目标,如文件、Kafka、控制台等。这里以打印到控制台为例:
sumResult.print();
  1. execute - 执行:启动Flink任务的执行。
env.execute();

三、DataStream API开发

  1. 添加依赖:在Maven项目中添加以下依赖:
<properties><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency></dependencies><build><extensions><extension><groupId>org.apache.maven.wagon</groupId><artifactId>wagon-ssh</artifactId><version>2.8</version></extension></extensions><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>wagon-maven-plugin</artifactId><version>1.0</version><configuration><!--上传的本地jar的位置--><fromFile>target/${project.build.finalName}.jar</fromFile><!--远程拷贝的地址--><url>scp://root:root@bigdata01:/opt/app</url></configuration></plugin></plugins></build>
  1. 编写代码:按照上述编码步骤编写具体的业务逻辑代码,如WordCount示例代码。

四、打包、上传与提交任务

  1. 打包:使用Maven命令mvn clean package对项目进行打包,生成可执行的JAR文件。
  2. 上传:通过配置wagon-maven-plugin,可以在打包时将JAR文件上传到指定的远程服务器路径。
  3. 提交任务: - 以UI的方式递交:在Flink集群的Web界面中,上传打包好的JAR文件,并配置相关参数,如主类名、运行参数等,然后启动任务。- 以命令的方式递交: - 带有自定义input参数的提交方式:
flink run -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input /home/wc.txt
    - 带有运行模式的提交方式:
flink run -Dexecution.runtime-mode=BATCH -c com.bigdata.day01.WordCount02 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar --input hdfs://bigdata01:9820/home/wc.txt

五、总结

通过本文的介绍,我们了解了Flink任务开发的基本流程,包括使用DataStream API进行编码、打包上传以及提交任务的两种方式。在实际应用中,可以根据具体的业务需求,灵活运用Flink的各种功能和特性,构建高效、可靠的大数据处理应用。同时,需要注意Flink版本的兼容性以及相关依赖的管理,以确保任务的顺利开发和运行。

标签: flink linq c#

本文转载自: https://blog.csdn.net/qq_68076599/article/details/143991395
版权归原作者 自节码 所有, 如有侵权,请联系我们删除。

“Flink任务开发:从代码编写到集群提交”的评论:

还没有评论