本地环境提交flink on yarn作业
在使用云厂商提供的flink job管理平台时,通过界面操作提交flink任务到yarn上十分方便,那么开发调试时能否在本地环境直接提交flink任务到yarn呢?
开源的flink管理平台 streampark 有提交flink on yarn作业的代码实现,可以参照 streampark 里对应模块的代码实现本地环境下的flink on yarn作业的提交。
其中 streampark-flink-client-core 作为提交flink job的核心模块,这里我们只关心flink on yarn作业的提交。
flink job提交流程分析:
1、flink启动脚本
以flink 1.14.4版本为例,flink安装目录的bin/目录下的 flink脚本 有详细的任务提交步骤,其中最后一行为:
sh
代码解读
复制代码
# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
可知,flink任务仍以java命令方式提交,程序入口为:
org.apache.flink.client.cli.CliFrontend
2、启动程序main方法
根据参数提交作业,CliFrontend#main方法执行流程如下:
java
代码解读
复制代码
// 1. find the configuration directory - 获取配置文件目录 final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. load the global configuration - 加载配置参数 final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3. load the custom command lines - 加载命令行参数 final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration, configurationDirectory); // 4. 创建CliFrontend的对象并调用CliFrontend#parseAndRun方法 final CliFrontend cli = new CliFrontend(configuration, customCommandLines); SecurityUtils.install(new SecurityConfiguration(cli.configuration)); retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
3、CliFrontend#parseAndRun方法
CliFrontend#parseAndRun方法主要代码如下:
java
代码解读
复制代码
// check for action ... // get action String action = args[0]; // remove action from parameters final String[] params = Arrays.copyOfRange(args, 1, args.length); try { // do action switch (action) { case ACTION_RUN: // String ACTION_RUN = "run"; run(params); return 0; case ACTION_RUN_APPLICATION: // String ACTION_RUN_APPLICATION = "run-application"; runApplication(params); return 0; case ACTION_LIST: // String ACTION_INFO = "list"; list(params); return 0; case ACTION_INFO: // String ACTION_LIST = "info"; info(params); return 0; ... 省略后续步骤
可以看到CliFrontend#parseAndRun方法通过获取命令行的第一个参数,匹配并运行指定方法。
假设我们以
run-application
命令启动程序,则调用CliFrontend#runApplication方法,进入该方法。
4、CliFrontend#runApplication方法
直接展示关键代码,CliFrontend#runApplication方法通过传入
clusterClientServiceLoader
参数来创建一个ApplicationDeployer对象,然后调用该对象的ApplicationDeployer#run方法,执行完成结束调用。
java
代码解读
复制代码
final ApplicationDeployer deployer = new ApplicationClusterDeployer(clusterClientServiceLoader); ... deployer.run(effectiveConfiguration, applicationConfiguration);
那么application命令提交任务到yarn的具体实现就在这里了,点击run方法并进入具体实现方法ApplicationClusterDeployer#run。
java
代码解读
复制代码
public <ClusterID> void run( final Configuration configuration, final ApplicationConfiguration applicationConfiguration) throws Exception { checkNotNull(configuration); checkNotNull(applicationConfiguration); LOG.info("Submitting application in 'Application Mode'."); final ClusterClientFactory<ClusterID> clientFactory = clientServiceLoader.getClusterClientFactory(configuration); try (final ClusterDescriptor<ClusterID> clusterDescriptor = clientFactory.createClusterDescriptor(configuration)) { final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(configuration); // 提交flink on yarn任务 clusterDescriptor.deployApplicationCluster( clusterSpecification, applicationConfiguration); } }
代码实现流程:
- 通过
clientServiceLoader
生成ClusterClientFactory客户端工厂; - 通过ClusterClientFactory对象创建 ClusterDescriptor 集群描述符 及 ClusterSpecification 集群规格对象;
- ClusterDescriptor#deployApplicationCluster为提交任务方法,跳转到具体实现方法YarnClusterDescriptor#deployApplicationCluster;
- 进入后我们看到deployApplicationCluster方法及下面的deployJobCluster方法,二者都调用了YarnClusterDescriptor#deployInternal方法,以完成flink on yran任务提交;
- 通过参数描述也可以看出deployApplicationCluster对应的是application提交模式,deployJobCluster对应的是per-job提交模式;
总结:通过对run方法的梳理,可以确定step2是我们提交任务所需要创建的对象,YarnClusterDescriptor#deployInternal方法是实现提交需要调用方法;
5、YarnClusterDescriptor#deployInternal方法
进入该方法,前面是一些参数校检及认证操作,然后通过 yarnClient 创建一个YarnClientApplication(这里的yarnClient在哪里生成?先不管,后面再看),后面进入startAppMaster方法,传入flinkConfiguration、yarnClient、yarnApplication等参数,这里应该会进行yarn任务的提交。
java
代码解读
复制代码
// Create application via yarnClient final YarnClientApplication yarnApplication = yarnClient.createApplication(); ... ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, validClusterSpecification);
进入YarnClusterDescriptor#startAppMaster方法,方法实现较长,这里模糊搜索'submit'关键词,定位到 yarnClient.submitApplication 方法执行任务的提交,就此flink on yarn任务正式开始提交到集群中。
java
代码解读
复制代码
LOG.info("Submitting application master " + appId); yarnClient.submitApplication(appContext);
6、yarnClient对象在哪里生成?
梳理提交流程后,已知入口程序CliFrontend#main方法会加载flink的FLINK_CONF_DIR配置文件目录并装载配置参数,而我在FLINK_CONF_DIR目录的配置文件中并没有找到对应的yarn参数配置,那么flink如何和yarn建立起联系呢?
仍需要从ApplicationClusterDeployer#run方法中创建的对象入手:
- clientServiceLoader是由DefaultClusterClientServiceLoader生成,使用SPI机制动态加载其ClusterClientFactory客户端工厂;
- clientFactory.createClusterDescriptor(configuration) 生成ClusterDescriptor集群描述符对象,调用的方法为 YarnClusterClientFactory#createClusterDescriptor,方法代码如下:java代码解读复制代码
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) { checkNotNull(configuration); // 读取flink configuration的CONF_DIR获取配置文件目录 final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR); // 设置日志相关参数,如未设置日志参数,默认配置文件目录下的"log4j.properties"或"logback.xml"文件路径为该参数值 YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory); // 获取YarnClient return getClusterDescriptor(configuration); } private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) { // 创建YarnClient对象 final YarnClient yarnClient = YarnClient.createYarnClient(); // 获取yarn集群配置 final YarnConfiguration yarnConfiguration = Utils.getYarnAndHadoopConfiguration(configuration); // 根据yarnConfiguration配置,初始化yarn客户端并启动建立连接 yarnClient.init(yarnConfiguration); yarnClient.start(); // 创建并返回ClusterDescriptor集群描述符对象 return new YarnClusterDescriptor( configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), false); }
点击进入Utils#getYarnAndHadoopConfiguration方法获取yarn集群配置,代码逻辑如下:java代码解读复制代码public static YarnConfiguration getYarnAndHadoopConfiguration( org.apache.flink.configuration.Configuration flinkConfig) { // 从flink configuration配置中获取对应的yarn配置参数(如未设置相关yarn参数,则不添加) final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig); // 获取系统环境变量"HADOOP_HOME"、"HADOOP_CONF_DIR",读取环境变量下的配置文件目录并添加到yarnConfig中 yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig)); return yarnConfig; }
HadoopUtils#getHadoopConfiguration方法部分截图:
由此确定flink与yarn集群建立连接,需要在提交flink任务的系统上配置hadoop环境变量,这与安装flink时需要配置环境变量完成与YARN集群的对接操作描述一致。
java实现flink on yarn作业的提交
实现思路:
由上分析可知,提交flink job需要flink配置文件、hadoop环境变量,在本地环境下需要在项目中添加 flink-conf.yaml 配置文件,没有配置hadoop环境变量的话,可以自行添加 core-site.xml、hdfs-site.xml、yarn-site.xml 配置文件到项目指定路径中并创建YarnClient对象,或手动配置参数创建YarnClient对象。
剩下的就是将 streampark 的streampark-flink-client-core模块下的flink on yarn提交任务代码提取出来,通过阅读代码发现提交flink任务还需要
flink-dist_*.jar
文件,这是flink任务提交到yarn的前提条件之一。
实现流程:
创建自定义的任务提交客户端,通过HdfsUtils将任务jar包及依赖lib/路径上传至指定hdfs文件目录中,调用提交客户端的 doSubmit方法 提交任务到yarn集群,doCancel方法 取消正在运行的flink任务。
Windows系统提交任务失败的问题解决
由于windows系统和linux系统下是不同的路径分隔符,导致windows下的本地环境提交flink on yarn作业失败(windows下找不到主类:YarnJobClusterEntrypoint)。
需在项目下创建 org.apache.flink.yarn 包路径,复制并修改 Utils 和 YarnClusterDescriptor 类文件,在启动时覆盖源码类加载执行,可以解决windows下提交任务失败的问题。
版权归原作者 Java面试教程 所有, 如有侵权,请联系我们删除。