0


SparkLauncher提交spark 正确的退出方式以及状态获取

知其然知其所以然

转载注明出处,且必须看到最后,留言证明

引发问题

spark任务状态获取不准确,任务是失败的,但结果返回成功,在注册的Listener中也可以看到状态先是FINISHED,过一会才会变成FAILED,因为FINISHED是isFinal(),导致任务退出,漏掉了后返回的失败状态

如果采用jobState.isFinal()判断任务结束,则会触发这个问题。

// spark程序退出的错误判断条件if(jobState.isFinal()){
 countDownLatch.countDown();}

原理解析

想要从根本上解决问题,还是需要知道为什么会先出现FINISHED后又变成FAILED,或者是怎么变成FAILED。
Spark 任务使用SparkLauncher.startApplication方式提交,核心代码如下,并通过代码逻辑总结为下图关系。
在这里插入图片描述

核心代码:

@OverridepublicSparkAppHandlestartApplication(SparkAppHandle.Listener... listeners)throwsIOException{// 初始化LauncherServer LauncherServer server =LauncherServer.getOrCreateServer();// 初始化AppHandleChildProcAppHandle handle =newChildProcAppHandle(server);// 注册listenerfor(SparkAppHandle.Listener l : listeners){
    handle.addListener(l);}// 注册handleString secret = server.registerHandle(handle);// 初始化进程ProcessBuilder pb =createBuilder();try{// 启动进程Process child = pb.start();// 注册进程
    handle.setChildProc(child, loggerName, logStream);}catch(IOException ioe){
    handle.kill();throw ioe;}return handle;}

首先看LauncherServer这个类,进入到类里,查看注释,大致逻辑为当User App(客户端)提交任务到Spark App(服务端),L. Backend会向 L. Server(LauncherServer)响应,之后L. Backend都是与App Handle交互,那么我们就需要重点研究App Handle。

=> LauncherServer类注释
LauncherServer类注释

=> App Handle类分析
从上图总结的关系看出,AppHandle里一共有两个角色,listener和process。listener是我们自定义的,实现stateChanged方法,主要是监听状态变化,那么process是干什么的呢。通过handle.setChildProc()进入到具体实现,通过继续追源码,最终都开启了monitorChild线程

voidsetChildProc(Process childProc,String loggerName,InputStream logStream){this.childProc = childProc;if(logStream !=null){this.redirector =newOutputRedirector(logStream, loggerName,SparkLauncher.REDIRECTOR_FACTORY,this);}else{// If there is no log redirection, spawn a thread that will wait for the child process// to finish.SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild).start();}}

进入到monitorChild方法,其实就是做一件事,监控SparkLauncher.startApplication里启动的进程process,并且根据进程值做相应状态处理。其中有一句注释

Override state with failure if the current state is not final, or is success

翻译过来就是:

当进程结束时,如果状态是 未完成成功 ,则覆盖(更新)为失败

与问题现象一致,到这,问题根因已经找到,顺便就把问题彻底解决。
从整个逻辑上看,当进程不存在时(proc.isAlive()),并且处理完状态后,最终调用dispose()方法。更新disposed属性为true,在父类AbstractAppHandle中有个isDisposed()方法可以获取该值。

/**
 * Wait for the child process to exit and update the handle's state if necessary, according to
 * the exit code.
 */voidmonitorChild(){Process proc = childProc;// 进程活着则一直等待,进程结束则走下面的处理while(proc.isAlive()){
      proc.waitFor();}// 获取最终进程值int ec;try{
      ec = proc.exitValue();}catch(Exception e){
      ec =1;}// 根据进程处理状态if(ec !=0){State currState =getState();// Override state with failure if the current state is not final, or is success.if(!currState.isFinal()|| currState ==State.FINISHED){setState(State.FAILED,true);}}// 释放dispose();}}

AbstractAppHandle.isDisposed()方法

booleanisDisposed(){return disposed;}

至此,spark程序是否真正结束应该通过AbstractAppHandle.isDisposed()方法判断
q: 是否可以使用proc.isAlive()方法?
a: 不可以,proc.isAlive()判断后,才会更新状态,也就是到最终状态的变更还是会有一点时间差

正确的代码编写

同步提交

思路:采用CountDownLatch,在spark程序未完成之前一直wait

publicvoidsyncSubmit(String mainClass,String args,Map<String,String> confMap)throwsIOException{// 用于阻塞主线程,等待任务结束CountDownLatch countDownLatch =newCountDownLatch(1);// 自定义SparkLauncher, 继承SparkLauncher
    sparkLauncher =newCustomSparkSubmitLauncher();// todo 吧啦吧啦 自己设置参数
    sparkLauncher
            .setJavaHome(xxx).setSparkHome(xxx).setMaster(xxx).setDeployMode(xxx).setAppName(xxx).setVerbose(true);// 重点来了,启动spark程序,注册Listener,用于获取程序状态
    sparkAppHandle = sparkLauncher.startApplication(newSparkAppHandle.Listener(){@OverridepublicvoidstateChanged(SparkAppHandle sparkAppHandle){
            jobState = sparkAppHandle.getState();// print log when state changeif(sparkAppHandle.getAppId()!=null){
                logger.info("{} stateChanged: {}", sparkAppHandle.getAppId(), jobState.toString());}else{
                logger.info("stateChanged: {}", jobState.toString());}// spark程序退出的错误判断条件//  if (jobState.isFinal()) {//  countDownLatch.countDown();//  }// spark程序退出的正确判断条件if(sparkLauncher.isDisposed()){
                countDownLatch.countDown();}}@OverridepublicvoidinfoChanged(SparkAppHandle handle){}});// 阻塞主线程
    countDownLatch.await();}

异步提交

思路:提交后使用sparkLauncher.isDisposed()轮询监控spark程序是否退出

publicvoidasyncSubmit(String mainClass,String args,Map<String,String> confMap)throwsIOException{// 自定义SparkLauncher, 继承SparkLauncher
    sparkLauncher =newCustomSparkSubmitLauncher();// todo 吧啦吧啦 自己设置参数
    sparkLauncher
            .setJavaHome(xxx).setSparkHome(xxx).setMaster(xxx).setDeployMode(xxx).setAppName(xxx).setVerbose(true);// 重点来了,启动spark程序,注册Listener,用于获取程序状态
    sparkAppHandle = sparkLauncher.startApplication(newSparkAppHandle.Listener(){@OverridepublicvoidstateChanged(SparkAppHandle sparkAppHandle){
            jobState = sparkAppHandle.getState();// print log when state changeif(sparkAppHandle.getAppId()!=null){
                logger.info("{} stateChanged: {}", sparkAppHandle.getAppId(), jobState.toString());}else{
                logger.info("stateChanged: {}", jobState.toString());}}@OverridepublicvoidinfoChanged(SparkAppHandle handle){}});}publicvoidmain(){asyncSubmit(参数)while(!sparkLauncher.isDisposed()){Thread.sleep(5000)
        logger.info("job state: "+ jobState)}
    logger.info("job final")// todo something}

CustomSparkSubmitLauncher

注意:类名随意该类必须在org.apache.spark.launcher包下,所以在项目里新建一个相同路径即可
原因:AbstractAppHandle 不是public的,所以不能被其他包的类访问,为什么要用这个类,因为要用isDisposed()方法,那为什么要用这个方法呢,那就看下面的详细解释吧。

packageorg.apache.spark.launcher;// 包路径非常重要,必须是该路径importjava.io.IOException;publicclassCustomSparkSubmitLauncherextendsSparkLauncher{privateSparkAppHandle sparkAppHandle;@OverridepublicSparkAppHandlestartApplication(SparkAppHandle.Listener... listeners)throwsIOException{this.sparkAppHandle =super.startApplication(listeners);return sparkAppHandle;}publicbooleanisDisposed(){return((AbstractAppHandle) sparkAppHandle).isDisposed();}}
标签: spark 大数据

本文转载自: https://blog.csdn.net/qq_37706484/article/details/129219283
版权归原作者 人生有如两个橘子 所有, 如有侵权,请联系我们删除。

“SparkLauncher提交spark 正确的退出方式以及状态获取”的评论:

还没有评论