0


通过java代码的方式提交任务远程到flink集群

flink通过java代码的方式提交任务

1. 引入pom文件

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency>
2.PackagedProgram类创建
2.1 PackaghedProgram.newBuilder

可以通过它的Builder方法实现,需要设置一些必备的信息,比如mainClass,jarPath这些

val packagedProgram:PackagedProgram=PackagedProgram.newBuilder().setEntryPointClassName("你的flink程序文件主函数入口")//"你的flink程序文件".setJarFile(jarFile)//"savepoint的信息".setSavepointRestoreSettings(SavepointRestoreSettings.none).setArguments("你的flink程序可能需要的参数","1","2","3").build();
2.2 SavepointRestoreSettings

主要就是保存点的设置,可以直接调用类中的static方法创建

//不设置的情况SavepointRestoreSettings.none
//通过path创建SavepointRestoreSettings.forPath(path, allowNonRestoredState)
3. 通过PackagedProgramUtils创建JobGraph
3.1 createJobGraph源码

这个类下有两个static方法方法可以直接创建JobGraph,区别是一个指定生成JobId,一个随机生成JobId

/**
  * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link
  * PackagedProgram}.
  *
  * @param packagedProgram to extract the JobGraph from
  * @param configuration to use for the optimizer and job graph generator
  * @param defaultParallelism for the JobGraph
  * @param jobID the pre-generated job id
  * @return JobGraph extracted from the PackagedProgram
  * @throws ProgramInvocationException if the JobGraph generation failed
  */publicstaticJobGraphcreateJobGraph(PackagedProgram packagedProgram,Configuration configuration,int defaultParallelism,@NullableJobID jobID,boolean suppressOutput)throwsProgramInvocationException{finalPipeline pipeline =getPipelineFromProgram(
                     packagedProgram, configuration, defaultParallelism, suppressOutput);finalJobGraph jobGraph =FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
                     packagedProgram.getUserCodeClassLoader(),
                     pipeline,
                     configuration,
                     defaultParallelism);if(jobID !=null){
         jobGraph.setJobID(jobID);}
     jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
     jobGraph.setClasspaths(packagedProgram.getClasspaths());
     jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());return jobGraph;}
3.2 创建JobGraph

PakcageProgram
configuration flink的conf
parallelism 并行度,设置默认的也可以
jobId
suppressoutOutPut boolean类型,是否打印stdout/stderr在jobGraph创建阶段

val jobGraph:JobGraph=PackagedProgramUtils.createJobGraph(
    packagedProgram,
    flinkConfig,
    parallelism,null,false)

configuration也可以通过加载本地flink-conf.yaml获得,传入flink的安装路径即可.

private def getFlinkDefaultConfiguration(flinkHome:String):Configuration={Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(newConfiguration())}
4. 提交远程任务
JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);CompletableFuture<JobID> result = client.submitJob(jobGraph);JobID jobId=  result.get();System.out.println("提交完成");System.out.println("jobId:"+ jobId.toString());

参考链接 :https://blog.csdn.net/czladamling/article/details/125204087

参考链接:https://blog.csdn.net/pingweicheng/article/details/118223041

标签: java 开发语言 flink

本文转载自: https://blog.csdn.net/chang2p/article/details/128099231
版权归原作者 chang2p 所有, 如有侵权,请联系我们删除。

“通过java代码的方式提交任务远程到flink集群”的评论:

还没有评论