SparkLaunch提交任务
1.提交Spark任务的方式
- 通过Spark-submit 提交任务
- 通过Yarn REST Api提交Spark任务
- 通过Spark Client Api 的方式提交任务
- 通过SparkLaunch 自带API提交任务
- 基于Livy的方式提交任务,可参考我的另一篇文章 Apache Livy 安装部署使用示例
上面的几种方式提交任务各自有对应的优缺点,不再进行赘述,下面要介绍的是通过SparkLaunch 的方式提交到集群中,并异步获取任务的执行状态进行更新到运行记录表中,从而实现Saprk任务的提交和状态获取。
2.SparkLaunch 官方接口
通过官方文档可以了解到SaprkLaunch 对应的方法:SparkLaunch
SparkLaunch 主要有两个接口:
- SparkAppHandle 主要负责Spark任务提交到集群上面
- SparkAppHandle.Listerner 主要是用来监控Spark的运行状态
可以查看SparkLaunch 类对应的方法主要用到的方法如下所示:
//设置配置文件地址publicSparkLaunchersetPropertiesFile(String path);//设置App 名称publicSparkLaunchersetAppName(String appName);//设置 MasterpublicSparkLaunchersetMaster(String master);//设置 部署模式publicSparkLaunchersetDeployMode(String mode);//设置 Jar包运行主类publicSparkLaunchersetMainClass(String mainClass);//设置 Spark 相关参数,需要以spark. 开头publicSparkLauncheraddSparkArg(String name,String value);//设置 Main函数的参数publicSparkLauncheraddAppArgs(String... args);// 启动Saprk任务的提交publicSparkAppHandlestartApplication(SparkAppHandle.Listener... listeners)
。。。。。。
上面的一些方法主要用于设置Spark所需要的相关参数,比如资源相关参数、Jar包相关参数、部署模式等,调用startApplication 方法会镇长的取创建一个任务提交的实现,通过下面的两个方法就能够使得任务可以正常提交到Yarn 上面
sparkLauncher.startApplication(newSparkAppHandle.Listener(){// 任务运行状态改变的时候触发的操作@OverridepublicvoidstateChanged(SparkAppHandle handle){}// 日志状态出现改变的时候触发的操作@OverridepublicvoidinfoChanged(SparkAppHandle handle){}});
3.任务提交流程及实战
总结上面的方法可以得到任务提交的总体流程:
- 创建SparkLaunch
- 初始化相关参数信息、资源参数、配置参数、Jar包参数
- 调用startApplication 启动任务的提交
- 调用stateChanged 捕获状态信息的改变并作出相应的操作
接下来给出完整的任务提交的相关伪代码:
/**
* 发起任务提交
*
* @param sparkApplicationParam Spark 相关配置参数
* @param otherConfigParams 其他配置参数
* @param mainParams main 方法配置参数
*/privatevoidlaunch(DmpRunInfo dmpRunInfo,SparkApplicationParam sparkApplicationParam,Map<String,String> otherConfigParams,String[] mainParams){// 初始化SparkLauncherSparkLauncher launcher =newSparkLauncher().setSparkHome(sparkApplicationParam.getSparkHome()).setAppResource(sparkApplicationParam.getMainJarPath()).setMainClass(sparkApplicationParam.getMainClass()).setMaster(sparkApplicationParam.getMaster()).setDeployMode(sparkApplicationParam.getDeployMode()).setAppName(sparkApplicationParam.getAppName()).setConf("spark.driver.memeory", sparkApplicationParam.getDriverMemory()).setConf("spark.executor.memory", sparkApplicationParam.getExecutorMemory()).setConf("spark.executor.cores", sparkApplicationParam.getExecutorCores())// spark.yarn.archive 配置的HDFS地址.setConf("spark.yarn.archive",SparkParamConstants.SPARK_YARN_ARCHIVE).setConf("spark.yarn.queue",SparkParamConstants.SPARK_PARAM_YARN_QUEUE).setVerbose(true);// 禁用输出到本地日志方式// .redirectError(new File(otherConfigParams.get("SPARK_ERROR_LOG_DIR")))// .redirectOutput(new File(otherConfigParams.get("SPARK_OUT_LOG_DIR")))/**
* 设置其他的参数时候需要使用[spark.] 开头的key ,否则spark 解析不出来
*/if(otherConfigParams !=null&& otherConfigParams.size()>0){
logger.info("开始设置spark job 运行参数");for(Map.Entry<String,String> conf : otherConfigParams.entrySet()){
logger.info("{}:{}", conf.getKey(), conf.getValue());
launcher.setConf(conf.getKey(), conf.getValue());}}if(mainParams.length !=0){
logger.info("开始设置Spark Job Main 方法的参数 {}",Arrays.toString(mainParams));
launcher.addAppArgs(mainParams);}
logger.info("参数设置完成,开始提交Spark任务");// 线程池的方式运行任务
executor.execute(()->{try{// 线程计数CountDownLatch countDownLatch =newCountDownLatch(1);SparkAppHandle sparkAppHandle = launcher.startApplication(newSparkAppHandle.Listener(){@OverridepublicvoidstateChanged(SparkAppHandle handle){// 修改运行状态
。。。。。。。。。
if(handle.getAppId()!=null){// 设置运行ID 到运行记录中
logger.info("{} stateChanged :{}", handle.getAppId(), handle.getState().toString());}else{
logger.info("stateChanged :{}", handle.getState().toString());}// 更新状态
。。。。。。。。
// 失败告警发送到群功能if(SparkAppHandle.State.FAILED.toString().equals(handle.getState().toString())){// 失败告警
。。。。。。。。。。。。
}// Job 状态完成之后退出线程if(handle.getState().isFinal()){
countDownLatch.countDown();}}@OverridepublicvoidinfoChanged(SparkAppHandle handle){// do something}});
logger.info("The task is executing, current is get application id before,please wait ........");String applicationId =null;while(!SparkAppHandle.State.RUNNING.equals(sparkAppHandle.getState())){
applicationId = sparkAppHandle.getAppId();if(applicationId !=null){
logger.warn("handle current state is {}, appid is {}",
sparkAppHandle.getState().toString(), applicationId);break;}}
logger.warn("handle current state is {}, appid is {}",
sparkAppHandle.getState().toString(), applicationId);
countDownLatch.await();}catch(Exception e){
logger.error(e.getMessage(), e);}});}
版权归原作者 Michealkz 所有, 如有侵权,请联系我们删除。