0


SparkLaunch提交Spark任务到Yarn集群

SparkLaunch提交任务


1.提交Spark任务的方式

  1. 通过Spark-submit 提交任务
  2. 通过Yarn REST Api提交Spark任务
  3. 通过Spark Client Api 的方式提交任务
  4. 通过SparkLaunch 自带API提交任务
  5. 基于Livy的方式提交任务,可参考我的另一篇文章 Apache Livy 安装部署使用示例

上面的几种方式提交任务各自有对应的优缺点,不再进行赘述,下面要介绍的是通过SparkLaunch 的方式提交到集群中,并异步获取任务的执行状态进行更新到运行记录表中,从而实现Saprk任务的提交和状态获取。


2.SparkLaunch 官方接口

通过官方文档可以了解到SaprkLaunch 对应的方法:SparkLaunch
SparkLaunch 主要有两个接口:

  1. SparkAppHandle 主要负责Spark任务提交到集群上面
  2. SparkAppHandle.Listerner 主要是用来监控Spark的运行状态

在这里插入图片描述

可以查看SparkLaunch 类对应的方法主要用到的方法如下所示:

//设置配置文件地址publicSparkLaunchersetPropertiesFile(String path);//设置App 名称publicSparkLaunchersetAppName(String appName);//设置 MasterpublicSparkLaunchersetMaster(String master);//设置 部署模式publicSparkLaunchersetDeployMode(String mode);//设置 Jar包运行主类publicSparkLaunchersetMainClass(String mainClass);//设置 Spark 相关参数,需要以spark. 开头publicSparkLauncheraddSparkArg(String name,String value);//设置 Main函数的参数publicSparkLauncheraddAppArgs(String... args);// 启动Saprk任务的提交publicSparkAppHandlestartApplication(SparkAppHandle.Listener... listeners) 
  。。。。。。

上面的一些方法主要用于设置Spark所需要的相关参数,比如资源相关参数、Jar包相关参数、部署模式等,调用startApplication 方法会镇长的取创建一个任务提交的实现,通过下面的两个方法就能够使得任务可以正常提交到Yarn 上面

 sparkLauncher.startApplication(newSparkAppHandle.Listener(){// 任务运行状态改变的时候触发的操作@OverridepublicvoidstateChanged(SparkAppHandle handle){}// 日志状态出现改变的时候触发的操作@OverridepublicvoidinfoChanged(SparkAppHandle handle){}});

3.任务提交流程及实战

总结上面的方法可以得到任务提交的总体流程:

  1. 创建SparkLaunch
  2. 初始化相关参数信息、资源参数、配置参数、Jar包参数
  3. 调用startApplication 启动任务的提交
  4. 调用stateChanged 捕获状态信息的改变并作出相应的操作

接下来给出完整的任务提交的相关伪代码:

/**
     * 发起任务提交
     *
     * @param sparkApplicationParam Spark 相关配置参数
     * @param otherConfigParams     其他配置参数
     * @param mainParams            main 方法配置参数
     */privatevoidlaunch(DmpRunInfo dmpRunInfo,SparkApplicationParam sparkApplicationParam,Map<String,String> otherConfigParams,String[] mainParams){// 初始化SparkLauncherSparkLauncher launcher =newSparkLauncher().setSparkHome(sparkApplicationParam.getSparkHome()).setAppResource(sparkApplicationParam.getMainJarPath()).setMainClass(sparkApplicationParam.getMainClass()).setMaster(sparkApplicationParam.getMaster()).setDeployMode(sparkApplicationParam.getDeployMode()).setAppName(sparkApplicationParam.getAppName()).setConf("spark.driver.memeory", sparkApplicationParam.getDriverMemory()).setConf("spark.executor.memory", sparkApplicationParam.getExecutorMemory()).setConf("spark.executor.cores", sparkApplicationParam.getExecutorCores())// spark.yarn.archive 配置的HDFS地址.setConf("spark.yarn.archive",SparkParamConstants.SPARK_YARN_ARCHIVE).setConf("spark.yarn.queue",SparkParamConstants.SPARK_PARAM_YARN_QUEUE).setVerbose(true);// 禁用输出到本地日志方式// .redirectError(new File(otherConfigParams.get("SPARK_ERROR_LOG_DIR")))// .redirectOutput(new File(otherConfigParams.get("SPARK_OUT_LOG_DIR")))/**
         * 设置其他的参数时候需要使用[spark.] 开头的key ,否则spark 解析不出来
         */if(otherConfigParams !=null&& otherConfigParams.size()>0){
            logger.info("开始设置spark job 运行参数");for(Map.Entry<String,String> conf : otherConfigParams.entrySet()){
                logger.info("{}:{}", conf.getKey(), conf.getValue());
                launcher.setConf(conf.getKey(), conf.getValue());}}if(mainParams.length !=0){
            logger.info("开始设置Spark Job Main 方法的参数 {}",Arrays.toString(mainParams));
            launcher.addAppArgs(mainParams);}
        logger.info("参数设置完成,开始提交Spark任务");// 线程池的方式运行任务
        executor.execute(()->{try{// 线程计数CountDownLatch countDownLatch =newCountDownLatch(1);SparkAppHandle sparkAppHandle = launcher.startApplication(newSparkAppHandle.Listener(){@OverridepublicvoidstateChanged(SparkAppHandle handle){// 修改运行状态
                        。。。。。。。。。

                        if(handle.getAppId()!=null){// 设置运行ID 到运行记录中
                            logger.info("{} stateChanged :{}", handle.getAppId(), handle.getState().toString());}else{
                            logger.info("stateChanged :{}", handle.getState().toString());}// 更新状态
                        。。。。。。。。
                        // 失败告警发送到群功能if(SparkAppHandle.State.FAILED.toString().equals(handle.getState().toString())){// 失败告警
                            。。。。。。。。。。。。
                        }// Job 状态完成之后退出线程if(handle.getState().isFinal()){
                            countDownLatch.countDown();}}@OverridepublicvoidinfoChanged(SparkAppHandle handle){// do something}});

                logger.info("The task is executing, current is get application id before,please wait ........");String applicationId =null;while(!SparkAppHandle.State.RUNNING.equals(sparkAppHandle.getState())){
                    applicationId = sparkAppHandle.getAppId();if(applicationId !=null){
                        logger.warn("handle current state is {}, appid is {}",
                                sparkAppHandle.getState().toString(), applicationId);break;}}
                logger.warn("handle current state is {}, appid is {}",
                        sparkAppHandle.getState().toString(), applicationId);
                countDownLatch.await();}catch(Exception e){
                logger.error(e.getMessage(), e);}});}
标签: spark 大数据 java

本文转载自: https://blog.csdn.net/qq_43081842/article/details/128522370
版权归原作者 Michealkz 所有, 如有侵权,请联系我们删除。

“SparkLaunch提交Spark任务到Yarn集群”的评论:

还没有评论