0


flink任务提交,查询,停止工具

flink任务提交器

1. 简介

因项目,需要在spring boot后台项目中集成flink任务提交,查询之类的功能,所有有了这个项目

这个项目,可以通过java api的形式,帮助你提交,查询,暂停flink任务,也可以构建和关闭flink yarn session集群。

主要通过restful接口和构建jobGraph实现

2. 支持以下的Flink运行模式

  • Flink local
  • Flink remote
  • Flink yarn session
  • Flink yarn per job

3. 主要类

3.1 FlinkInfo

根据Flink运行环境路径地址生成,提交和停止任务时使用

  1. class FlinkInfo(val flinkHome:String)extends Serializable {private[this]lazyval FLINK_SCALA_VERSION_PATTERN: Pattern = Pattern.compile("^flink-dist_(.*)-(.*).jar$")lazyval fullVersion:String=s"${version}_$scalaVersion"/**
  2. * get scala version
  3. */lazyval scalaVersion:String={val matcher: Matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
  4. matcher.matches()
  5. matcher.group(1)}/**
  6. * get Flink version
  7. */lazyval version:String={val matcher: Matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
  8. matcher.matches()
  9. matcher.group(2)}/**
  10. * get all flink libs
  11. */lazyval flinkLib: File ={
  12. require(flinkHome !=null,"flinkHome can not be null")
  13. require(new File(flinkHome).exists(),"flinkHome not exist")val lib =new File(s"$flinkHome/lib")
  14. require(lib.exists()&& lib.isDirectory,s"$flinkHome/lib should be directory")
  15. lib
  16. }/**
  17. * yarn-ship content
  18. */lazyval flinkYarnShipFiles: List[String]={
  19. List(flinkLib.toString,s"$flinkHome/plugins",s"$flinkHome/conf/log4j.properties")}/**
  20. * get flink distJar
  21. */lazyval flinkDistJar: File ={val distJar: Array[File]= flinkLib.listFiles().filter((_: File).getName.matches("flink-dist_.*\\.jar"))
  22. distJar match{case x if x.isEmpty =>thrownew IllegalArgumentException(s"no flink-dist.jar in $flinkLib")case x if x.length >1=>thrownew IllegalArgumentException(s"there are multiple flink-dist.jar in $flinkLib ")case _ =>}
  23. distJar.head
  24. }}

3.2 ExecutionMode

执行模式枚举类

  1. publicenumExecutionModeimplementsSerializable{LOCAL(0,"local"),REMOTE(1,"remote"),FLINK_YARN_SESSION(2,"yarn-session"),FLINK_YARN_PER_JOB(3,"yarn-per-job");privatefinalInteger mode;privatefinalString name;ExecutionMode(Integer mode,String name){this.mode = mode;this.name = name;}}

3.3 ResolveOrder

类加载策略枚举类

  1. publicenumResolveOrder{/**
  2. * parent-first
  3. */PARENT_FIRST("parent-first",0),/**
  4. * child-first
  5. */CHILD_FIRST("child-first",1);privatefinalString name;privatefinalInteger value;ResolveOrder(String name,Integer value){this.name = name;this.value = value;}}

3.4 FlinkSubmitRequest

用于提交Flink任务

  • appName 任务名称
  • flinkJarPath jar包地址
  • savePoint 检查的路径
  • flinkParallelism 并行度
  • args 参数集合
  1. caseclass FlinkSubmitRequest(flinkVersion: FlinkInfo,
  2. executionMode: ExecutionMode,
  3. resolveOrder: ResolveOrder,
  4. appName:String,
  5. mainClass:String,
  6. flinkJarPath:String,
  7. savePoint:String,
  8. flinkParallelism:Int,
  9. args: JavaList[String],
  10. extraParameter: JavaMap[String,Any]){lazyval effectiveAppName:String=if(this.appName ==null)"flink-task"elsethis.appName
  11. lazyval supportTaskJarFile: File ={new File(flinkJarPath)}lazyval savepointRestoreSettings: SavepointRestoreSettings ={lazyval allowNonRestoredState:Boolean= Try(
  12. extraParameter.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key).toString.toBoolean).getOrElse(false)
  13. savePoint match{case sp if Try(sp.isEmpty).getOrElse(true)=> SavepointRestoreSettings.none
  14. case sp => SavepointRestoreSettings.forPath(sp, allowNonRestoredState)}}}

3.5 FlinkSubmitter

使用入口

  1. object FlinkSubmitter {/**
  2. * submit flink task
  3. */def submit(submitRequest: FlinkSubmitRequest): FlinkSubmitResponse ={
  4. submitRequest.executionMode match{case ExecutionMode.LOCAL => LocalSubmit.submit(submitRequest)case ExecutionMode.REMOTE => RemoteSubmit.submit(submitRequest)case ExecutionMode.FLINK_YARN_SESSION => YarnSessionSubmit.submit(submitRequest)case ExecutionMode.FLINK_YARN_PER_JOB => YarnPerJobSubmit.submit(submitRequest)case _ =>thrownew UnsupportedOperationException(s"Unsupported ${submitRequest.executionMode} mode to submit flink task")}}/**
  5. * query flink task
  6. */def query(queryRequest: FlinkQueryRequest): FlinkQueryResponse ={
  7. queryRequest.executionMode match{case ExecutionMode.LOCAL => LocalSubmit.query(queryRequest)case ExecutionMode.REMOTE => RemoteSubmit.query(queryRequest)case ExecutionMode.FLINK_YARN_SESSION => YarnSessionSubmit.query(queryRequest)case ExecutionMode.FLINK_YARN_PER_JOB => YarnPerJobSubmit.query(queryRequest)case _ =>thrownew UnsupportedOperationException(s"Unsupported ${queryRequest.executionMode} mode to query flink task")}}/**
  8. * stop flink task
  9. */def stop(stopRequest: FlinkStopRequest): FlinkStopResponse ={
  10. stopRequest.executionMode match{case ExecutionMode.LOCAL => LocalSubmit.stop(stopRequest)case ExecutionMode.REMOTE => RemoteSubmit.stop(stopRequest)case ExecutionMode.FLINK_YARN_SESSION => YarnSessionSubmit.stop(stopRequest)case ExecutionMode.FLINK_YARN_PER_JOB => YarnPerJobSubmit.stop(stopRequest)case _ =>thrownew UnsupportedOperationException(s"Unsupported ${stopRequest.executionMode} node to stop flink task")}}/**
  11. * deploy flink-yarn-session cluster
  12. */def deploy(flinkDeployRequest: FlinkDeployRequest): FlinkDeployResponse ={
  13. YarnSessionSubmit.deployYarnSession(flinkDeployRequest)}/**
  14. * shutdown flink-yarn-session cluster
  15. */def shutDown(flinkShutdownRequest: FlinkShutdownRequest): FlinkShutdownResponse ={
  16. YarnSessionSubmit.shutDownYarnSession(flinkShutdownRequest)}}

4.测试用例(remote模式)

其余模式测试用例请参考工程里的代码

4.1 提交flink任务

  1. publicclassFlinkSubmitTest{FlinkInfo flinkInfo =newFlinkInfo("/opt/flink-test");ResolveOrder resolveOrder =ResolveOrder.of(1);String appName ="flink-submit-test";String batchMainClass ="com.czl.submitter.FlinkTaskBatch";String jarPath ="./jar/flink-task-api-test-1.0.jar";List<String> param =Arrays.asList("1","2","3");HashMap<String,Object> extraMap =newHashMap<>();@TestvoidremoteTest()throwsURISyntaxException{ExecutionMode remoteMode =ExecutionMode.of(1);URI activeAddress =newURI("http://ip:port/");
  2. extraMap.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
  3. extraMap.put(RestOptions.PORT.key(), activeAddress.getPort());FlinkSubmitRequest remoteSubmit =newFlinkSubmitRequest(
  4. flinkInfo,
  5. remoteMode,
  6. resolveOrder,
  7. appName,
  8. batchMainClass,
  9. jarPath,null,1,
  10. param,
  11. extraMap);FlinkSubmitResponse submit =FlinkSubmitter.submit(remoteSubmit);System.out.println(submit.jobId());}}

4.2 查询flink任务状态

主要参数

  • 执行模式
  • flink运行地址
  • jobId
  1. publicclassFlinkQueryTest{@TestvoidremoteQuery(){String master ="http://ip:port";String jobId ="e0641a4f9f060961419531dd5233fe6c";FlinkQueryRequest request =newFlinkQueryRequest(ExecutionMode.REMOTE, master, jobId);FlinkQueryResponse query =FlinkSubmitter.query(request);System.out.println(query);}}

4.3 停止flink任务

主要参数

  • savepoint路径
  • jobId
  1. publicclassFlinkStopTest{FlinkInfo flinkInfo =newFlinkInfo("/opt/flink-test");String customSavePoint ="./sp/";Map<String,Object> extraParameter =newHashMap<>(0);@TestvoidremoteStop()throwsURISyntaxException{ExecutionMode remote =ExecutionMode.REMOTE;URI activeAddress =newURI("http://ip:port/");
  2. extraParameter.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
  3. extraParameter.put(RestOptions.PORT.key(), activeAddress.getPort());FlinkStopRequest request =newFlinkStopRequest(flinkInfo,
  4. remote,null,"e0641a4f9f060961419531dd5233fe6c",true,
  5. customSavePoint,false, extraParameter);FlinkStopResponse stop =FlinkSubmitter.stop(request);System.out.println(stop.savePointPath());}}

4.4 部署和停止flink yarn session集群

主要参数

  • FlinkInfo
  • yarn name
  1. publicclassYarnSessionTest{@Testvoiddeploy(){FlinkInfo flinkInfo =newFlinkInfo("/opt/flink-test");FlinkDeployRequest deployRequest =newFlinkDeployRequest(flinkInfo,"czl-yarn-session-test");FlinkDeployResponse flinkDeployResponse =YarnSessionSubmit.deployYarnSession(deployRequest);System.out.println(flinkDeployResponse.address());System.out.println(flinkDeployResponse.clusterId());}@Testvoidshutdown(){FlinkInfo flinkInfo =newFlinkInfo("/opt/flink-test");FlinkShutdownRequest shutdownRequest =newFlinkShutdownRequest(flinkInfo,"application_1668191242058_0072");YarnSessionSubmit.shutDownYarnSession(shutdownRequest);}}

5. 仓库地址

(https://github.com/Chenzhiling/flink-submitter-api)

6. 参考

从StreamX项目获得了相当多的思路,感谢

标签: flink java scala

本文转载自: https://blog.csdn.net/czladamling/article/details/127967738
版权归原作者 花信風-Ling 所有, 如有侵权,请联系我们删除。

“flink任务提交,查询,停止工具”的评论:

还没有评论