flink通过java代码的方式提交任务
1. 引入pom文件
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency>
2.PackagedProgram类创建
2.1 PackaghedProgram.newBuilder
可以通过它的Builder方法实现,需要设置一些必备的信息,比如mainClass,jarPath这些
val packagedProgram:PackagedProgram=PackagedProgram.newBuilder().setEntryPointClassName("你的flink程序文件主函数入口")//"你的flink程序文件".setJarFile(jarFile)//"savepoint的信息".setSavepointRestoreSettings(SavepointRestoreSettings.none).setArguments("你的flink程序可能需要的参数","1","2","3").build();
2.2 SavepointRestoreSettings
主要就是保存点的设置,可以直接调用类中的static方法创建
//不设置的情况SavepointRestoreSettings.none
//通过path创建SavepointRestoreSettings.forPath(path, allowNonRestoredState)
3. 通过PackagedProgramUtils创建JobGraph
3.1 createJobGraph源码
这个类下有两个static方法方法可以直接创建JobGraph,区别是一个指定生成JobId,一个随机生成JobId
/**
* Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link
* PackagedProgram}.
*
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @param jobID the pre-generated job id
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/publicstaticJobGraphcreateJobGraph(PackagedProgram packagedProgram,Configuration configuration,int defaultParallelism,@NullableJobID jobID,boolean suppressOutput)throwsProgramInvocationException{finalPipeline pipeline =getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism, suppressOutput);finalJobGraph jobGraph =FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
packagedProgram.getUserCodeClassLoader(),
pipeline,
configuration,
defaultParallelism);if(jobID !=null){
jobGraph.setJobID(jobID);}
jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());return jobGraph;}
3.2 创建JobGraph
PakcageProgram
configuration flink的conf
parallelism 并行度,设置默认的也可以
jobId
suppressoutOutPut boolean类型,是否打印stdout/stderr在jobGraph创建阶段
val jobGraph:JobGraph=PackagedProgramUtils.createJobGraph(
packagedProgram,
flinkConfig,
parallelism,null,false)
configuration也可以通过加载本地flink-conf.yaml获得,传入flink的安装路径即可.
private def getFlinkDefaultConfiguration(flinkHome:String):Configuration={Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(newConfiguration())}
4. 提交远程任务
JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);CompletableFuture<JobID> result = client.submitJob(jobGraph);JobID jobId= result.get();System.out.println("提交完成");System.out.println("jobId:"+ jobId.toString());
参考链接 :https://blog.csdn.net/czladamling/article/details/125204087
参考链接:https://blog.csdn.net/pingweicheng/article/details/118223041
版权归原作者 chang2p 所有, 如有侵权,请联系我们删除。