0


【Flink】详解Flink任务提交流程

启动一个任务

通常我们会使用

bin/flink run -t yarn-per-job -c com.xxx.xxx.WordCount/WordCount.jar

方式启动任务;我们看一下 flink文件中到底做了什么,以下是其部分源码

# Convert relative path to absolute pathbin=`dirname"$target"`# get flink config."$bin"/config.sh

if["$FLINK_IDENT_STRING"=""];thenFLINK_IDENT_STRING="$USER"fiCC_CLASSPATH=`constructFlinkClassPath`log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log"-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)# Add Client-specific JVM optionsFLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS}${FLINK_ENV_JAVA_OPTS_CLI}"# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systemsexec"${JAVA_RUN}"$JVM_ARGS$FLINK_ENV_JAVA_OPTS"${log_setting[@]}"-classpath"`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

可以看到,第一步将相对地址转换成绝对地址;第二步获取 Flink 配置信息,这个信息放在 bin 目录下的 config. sh中;第三步获取 JVM 配置信息;最后一步就是程序真正的入口 org.apache.flink.client.cli.CliFrontend,这里有几点需要注意的:

  1. Linux- exec 命令用于调用并执行指令;
  2. 在配置文件中 JAVA_RUN="$JAVA_HOME"/bin/java,因此这里运行的是 Java 程序;
  3. FLINK_ENV_JAVA_OPTS:是上文获取的关于 JVM 的相关配置;
  4. java -cp 和 -classpath 一样,是指定类运行所依赖其他类的路径;
  5. INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"config. sh文件指定了 Hadoop 的相关路径;
  6. "$@" 表示所有参数以以 "$1" " $2" … "$ n" 的形式输出,例如 bin/flink run -t yarn-per-job -c com.xxx.xxx.WordCount/WordCount.jar -p 11 "$@" 会解析成 run -t yarn-per-job -c com.xxx.xxx.WordCount/WordCount.jar -p 11

小结一下,我们任务的提交流程是输入启动命令→读取配置信息→

java -cp

开启虚拟机→开启 CliFrontend→运行 CliFrontend. run

CliFrontend 详解

classCliFrontend{protectedvoidrun(String[] args)throwsException{LOG.info("Running 'run' command.");// TODO 获取默认的运行时参数finalOptions commandOptions =CliFrontendParser.getRunCommandOptions();// 解析参数返回CommandLine对象// 解析见-> DefaultParser详解,核心逻辑是使用CliFrontendParser进行解析->使用DefaultParser进行解析finalCommandLine commandLine =getCommandLine(commandOptions, args,true);// evaluate help flagif(commandLine.hasOption(HELP_OPTION.getOpt())){CliFrontendParser.printHelpForRun(customCommandLines);return;}// TODO 按照Generic、Yarn、Default顺序判断是否活跃// 详解见 -> sub1 finalCustomCommandLine activeCommandLine =validateAndGetActiveCommandLine(checkNotNull(commandLine));// TODO 根据输入参数封装了一个ProgramOptions// 详解见 -> ProgramOptionsfinalProgramOptions programOptions =ProgramOptions.create(commandLine);// TODO 获取任务Jar包和依赖项;// 详解见 -> sub2finalList<URL> jobJars =getJobJarAndDependencies(programOptions);// TODO 获取有效配置 -> 这里的逻辑最终是调用每一个CLI的toConfiguration方法finalConfiguration effectiveConfiguration =getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);// TODO 打印日志信息,LOG.debug("Effective executor configuration: {}", effectiveConfiguration);// TODO 将programOptions(程序)和effectiveConfiguration(有效配置信息)再封装成一个PackagedProgram对象try(PackagedProgram program =getPackagedProgram(programOptions, effectiveConfiguration)){// TODO 执行封装对象executeProgram(effectiveConfiguration, program);}}// ****************************** sub1 ******************************************** //publicCustomCommandLinevalidateAndGetActiveCommandLine(CommandLine commandLine){LOG.debug("Custom commandlines: {}", customCommandLines);for(CustomCommandLine cli : customCommandLines){LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));// 按照Generic、Yarn、Default顺序判断是否活跃,具体分析见下文 if(cli.isActive(commandLine)){return cli;}}thrownewIllegalStateException("No valid command-line found.");}// ************************************************************************** //// ****************************** sub2 ******************************************** //privateList<URL>getJobJarAndDependencies(ProgramOptions programOptions)throwsCliArgsException{// TODO 获取入口类String entryPointClass = programOptions.getEntryPointClassName();// TODO 获取JAR包路径String jarFilePath = programOptions.getJarFilePath();try{File jarFile = jarFilePath !=null?getJarFile(jarFilePath):null;returnPackagedProgram.getJobJarAndDependencies(jarFile, entryPointClass);}catch(FileNotFoundException|ProgramInvocationException e){thrownewCliArgsException("Could not get job jar and dependencies from JAR file: "+ e.getMessage(), e);}}// ************************************************************************** //}

c

classDefaultParser{}/*
CustomCommandLine 是一个接口,GenericCLI、AbstractYarnCli、DefaultCLI
继承关系见下图
*/classGenericCLI{/* 每一个Option都是这样的解析逻辑,有一个短的opt参数进行判断,同时有一个longOpt参数进行辅助判断,
    以executorOption为例,就是命令行中有-e 或者 -executor 就会返回True
    */privatefinalOption executorOption =newOption("e","executor",true,"DEPRECATED: Please use the -t option instead which is also available with the \"Application Mode\".\n"+"The name of the executor to be used for executing the given job, which is equivalent "+"to the \""+DeploymentOptions.TARGET.key()+"\" config option. The "+"currently available executors are: "+getExecutorFactoryNames()+".");/*
    1. 获取验证配置项是否存在
    2. 获取验证执行器参数,即-e, executor
    3. 获取验证目标参数是否存在,即-t target
    */publicbooleanisActive(CommandLine commandLine){// configuration是配置文件return configuration.getOptional(DeploymentOptions.TARGET).isPresent()|| commandLine.hasOption(executorOption.getOpt())|| commandLine.hasOption(targetOption.getOpt());}}classFlinkYarnSessionCli{publicbooleanisActive(CommandLine commandLine){// 调用父类的的isActive方法if(!super.isActive(commandLine)){return(isYarnPropertiesFileMode(commandLine)&& yarnApplicationIdFromYarnProperties !=null);}returntrue;}publicConfigurationtoConfiguration(CommandLine commandLine)throwsFlinkException{// we ignore the addressOption because it can only contain "yarn-cluster"finalConfiguration effectiveConfiguration =newConfiguration();applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);// TODO 获取APPID,查看是否启动了集群finalApplicationId applicationId =getApplicationId(commandLine);if(applicationId !=null){finalString zooKeeperNamespace;if(commandLine.hasOption(zookeeperNamespace.getOpt())){
                zooKeeperNamespace = commandLine.getOptionValue(zookeeperNamespace.getOpt());}else{
                zooKeeperNamespace =
                        effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());}// TODO 设置高可用
            effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);// TODO 设置YARN的应用ID
            effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, applicationId.toString());// 执行器,已经启动了集群就是Session模式
            effectiveConfiguration.setString(DeploymentOptions.TARGET,YarnSessionClusterExecutor.NAME);}else{// 没启动集群就是per-JOB 模式
            effectiveConfiguration.setString(DeploymentOptions.TARGET,YarnJobClusterExecutor.NAME);}// TODO 如果有JM内存设置,则进行设置if(commandLine.hasOption(jmMemory.getOpt())){String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt());if(!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)){
                jmMemoryVal +="m";}// TODO 添加JM内存设置
            effectiveConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse(jmMemoryVal));}// TODO 如果有TM总内存设置,则进行设置if(commandLine.hasOption(tmMemory.getOpt())){String tmMemoryVal = commandLine.getOptionValue(tmMemory.getOpt());if(!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)){
                tmMemoryVal +="m";}//  TODO 添加TM的总内存;
            effectiveConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse(tmMemoryVal));}// TODO 如果有SLOT数设置,则进行设置if(commandLine.hasOption(slots.getOpt())){// 每个TM的SLOT数
            effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));}// TODO 设置动态属性
        dynamicPropertiesEncoded =encodeDynamicProperties(commandLine);if(!dynamicPropertiesEncoded.isEmpty()){Map<String,String> dynProperties =getDynamicProperties(dynamicPropertiesEncoded);for(Map.Entry<String,String> dynProperty : dynProperties.entrySet()){
                effectiveConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());}}if(isYarnPropertiesFileMode(commandLine)){returnapplyYarnProperties(effectiveConfiguration);}else{return effectiveConfiguration;}}}// FlinkYarnSessionCli的父类classAbstractYarnCli{publicbooleanisActive(CommandLine commandLine){// TODO 获取JM地址finalString jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(),null);// TODO ID = “yarn-cluster”, finalboolean yarnJobManager =ID.equals(jobManagerOption);// TODO 如果Yarn启动,则会有APPID,判断是否有APPID;1. 看输入命令中是否有APPID;2.从配置文件中读取APPID;finalboolean hasYarnAppId =
                    commandLine.hasOption(applicationId.getOpt())|| configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();// TODO 使用“yarn-session”或者“yarn-per-job”进行比较finalboolean hasYarnExecutor =YarnSessionClusterExecutor.NAME.equalsIgnoreCase(
                                    configuration.get(DeploymentOptions.TARGET))||YarnJobClusterExecutor.NAME.equalsIgnoreCase(
                                    configuration.get(DeploymentOptions.TARGET));// -m yarn-cluster || yarn 有APPID、命令行制定了 || 执行器是YARN的return hasYarnExecutor || yarnJobManager || hasYarnAppId;}}// DefaultCLI用来部署Standalone模式classDefaultCLI{publicbooleanisActive(CommandLine commandLine){// 永远返回Truereturntrue;}}classProgramOptions{publicstaticProgramOptionscreate(CommandLine line)throwsCliArgsException{if(isPythonEntryPoint(line)||containsPythonDependencyOptions(line)){// Python相关的类都是调用这个方法returncreatePythonProgramOptions(line);}else{// 其他都是调用这个方法returnnewProgramOptions(line);}}protectedProgramOptions(CommandLine line)throwsCliArgsException{super(line);// TODO 获取入口类this.entryPointClass =
                line.hasOption(CLASS_OPTION.getOpt())? line.getOptionValue(CLASS_OPTION.getOpt()):null;// TODO 获取JAR包this.jarFilePath =
                line.hasOption(JAR_OPTION.getOpt())? line.getOptionValue(JAR_OPTION.getOpt()):null;this.programArgs =extractProgramArgs(line);// TODO 路径解析并添加到一个ArrayList中List<URL> classpaths =newArrayList<URL>();if(line.hasOption(CLASSPATH_OPTION.getOpt())){for(String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())){try{
                    classpaths.add(newURL(path));}catch(MalformedURLException e){thrownewCliArgsException("Bad syntax for classpath: "+ path);}}}this.classpaths = classpaths;// TODO 先检查是否有并行度设置,如果有则解析并行度并进行值检查,如果没有则使用默认并行度if(line.hasOption(PARALLELISM_OPTION.getOpt())){String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt());try{
                parallelism =Integer.parseInt(parString);if(parallelism <=0){thrownewNumberFormatException();}}catch(NumberFormatException e){thrownewCliArgsException("The parallelism must be a positive number: "+ parString);}}else{
            parallelism =ExecutionConfig.PARALLELISM_DEFAULT;}// TODO 分发模式,这个和上文介绍的executorOption判断逻辑是一致的。
        detachedMode =
                line.hasOption(DETACHED_OPTION.getOpt())|| line.hasOption(YARN_DETACHED_OPTION.getOpt());
        shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt());this.savepointSettings =CliFrontendParser.createSavepointRestoreSettings(line);}}classJobManagerOptions{publicstaticfinalConfigOption<MemorySize>TOTAL_PROCESS_MEMORY=// 这类配置信息都是这种链式调用key("jobmanager.memory.process.size").memoryType().noDefaultValue().withDescription("Total Process Memory size for the JobManager. This includes all the memory that a "+"JobManager JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. "+"In containerized setups, this should be set to the container memory. See also "+"'jobmanager.memory.flink.size' for Total Flink Memory size configuration.");}

CustomCommandLine 继承关系:

![![[【Flink】详解Flink任务提交流程_image_1.png]]](https://img-blog.csdnimg.cn/965a34759c5f4308be30303cef2b4d8a.png)

参数解析

classCliFrontendParser{// 选项列表// 解析和以executorOption等的逻辑都是一样的staticfinalOptionSAVEPOINT_DISPOSE_OPTION=newOption("d","dispose",true,"Path of savepoint to dispose.");staticfinalOptionHELP_OPTION=newOption("h","help",false,"Show the help message for the CLI Frontend or the action.");...}classDefaultParser{publicCommandLineparse(Options options,String[] arguments,Properties properties,boolean stopAtNonOption)throwsParseException{...;
         if(arguments !=null){String[] var9 = arguments;int var10 = arguments.length;for(int var7 =0; var7 < var10;++var7){String argument = var9[var7];// 核心解析逻辑this.handleToken(argument);}}...}privatevoidhandleToken(String token)throwsParseException{this.currentToken = token;if(this.skipParsing){this.cmd.addArg(token);}elseif("--".equals(token)){this.skipParsing =true;}elseif(this.currentOption !=null&&this.currentOption.acceptsArg()&&this.isArgument(token)){// 添加参数值this.currentOption.addValueForProcessing(this.stripLeadingAndTrailingQuotesDefaultOn(token));}elseif(token.startsWith("--")){// TODO 解析 -- 形式参数;this.handleLongOption(token);}elseif(token.startsWith("-")&&!"-".equals(token)){// TODO 解析 - 形式参数this.handleShortAndLongOption(token);}else{// TODO 解析未知参数this.handleUnknownToken(token);}if(this.currentOption !=null&&!this.currentOption.acceptsArg()){this.currentOption =null;}}privatevoidhandleLongOption(String token)throwsParseException{if(token.indexOf(61)==-1){// TODO 解析的是不包含=号的,例如-L、--lthis.handleLongOptionWithoutEqual(token);}else{// TODO 解析包含=号的,例如--L=Vthis.handleLongOptionWithEqual(token);}}// 解析逻辑都是去除前缀,然后校验参数privatevoidhandleLongOptionWithoutEqual(String token)throwsParseException{List<String> matchingOpts =this.getMatchingLongOptions(token);if(matchingOpts.isEmpty()){this.handleUnknownToken(this.currentToken);}else{if(matchingOpts.size()>1&&!this.options.hasLongOption(token)){thrownewAmbiguousOptionException(token, matchingOpts);}String key =this.options.hasLongOption(token)? token :(String)matchingOpts.get(0);// 参数添加到执行命令this.handleOption(this.options.getOption(key));}}privateList<String>getMatchingLongOptions(String token){if(this.allowPartialMatching){// 部分匹配走这里returnthis.options.getMatchingOptions(token);}else{List<String> matches =newArrayList(1);if(this.options.hasLongOption(token)){// 获取参数Option option =this.options.getOption(token);// 添加参数
                matches.add(option.getLongOpt());}return matches;}}publicOptiongetOption(String opt){// 先去前缀
        opt =Util.stripLeadingHyphens(opt);// 去完前缀后看短字符有没有包含,有返回短参数否则返回长参数returnthis.shortOpts.containsKey(opt)?(Option)this.shortOpts.get(opt):(Option)this.longOpts.get(opt);}privatevoidhandleOption(Option option)throwsParseException{// 添加前先检查起哪一个参数this.checkRequiredArgs();
        option =(Option)option.clone();this.updateRequiredOptions(option);// 添加参数this.cmd.addOption(option);// 设置当前参数,跟checkRequiredArgs 这个方法相配合if(option.hasArg()){this.currentOption = option;}else{this.currentOption =null;}}}

小结

  1. 程序的入口命令:run -t yarn-per-job /opt/module...ratget.jar --port 9999
  2. 入口类是:org.apache.flink.client.cli.CliFrontend;环境信息:Config.sh
  3. Run方法中 1. validateAndGetActiveCommandLine:按照Generic、Yarn、Default顺序判断是否活跃,创建对应的客户端; 1. Generic的isActive方法中 1. 获取验证配置项是否存在2. 获取验证执行器参数是否存在,即 -e, executor3. 获取验证目标参数是否存在,即-t target2. FlinkYarnSessionCliisActive方法 1. -m yarn-cluster:判断是否是per-job模式;2. 根据 JM 地址→是否有 APPID,看是否启动了 Yarn(从命令行和配置信息中检查)3. 使用“yarn-session”或者“yarn-per-job”进行比较,看exector是否是这两个;4. 上述三种情况任意一种成立都会使用 Yarn 模式;3. Default的isActive方法永远返回True;2. 根据输入参数封装了一个ProgramOptions对象,该对象包括 Java 的主要获取入口类、JAR 包路径、程序参数、并行度等;3. 获取并设置有效配置(高可用、JM 和 TM 内存、TM-SLOT 数),简而言之就是将多个配置信息(activeCommandLine, commandLine,programOptions, jobJars)变成一个Configuration对象;4. 将程序+配置信息封装成一个PackagedProgram对象然后调用其 execute 方法执行任务;

往期回顾

  1. 【Flink】Flink时间语义详解
  2. 【Flink】详解JobGraph
  3. 【Flink】详解StreamGraph
  4. 【Flink】浅谈Flink架构和调度
  5. 【Flink】详解Flink的八种分区
  6. 【Flink】浅谈Flink背压问题(1)
  7. 【分布式】浅谈CAP、BASE理论(1)
标签: flink java 大数据

本文转载自: https://blog.csdn.net/pcx171/article/details/131786472
版权归原作者 小猪猪家的大猪猪 所有, 如有侵权,请联系我们删除。

“【Flink】详解Flink任务提交流程”的评论:

还没有评论