Spark的任务提交可以通过在Spark客户端上调用shell脚本将spark任务提交到yarn上执行。
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn\
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1\
--queue thequeue \
examples/jars/spark-examples*.jar \10
在某些场景下,无法直接去调用shell脚本或者需要监控任务的执行结果情况。这样的话,尝试通过JAVA语言、SparkLauncher实现Spark任务的提交和执行结果的获取。
以下的例子以Spark On Yarn的模式来设计的。
- 要求运行这个类的机器上拥有Spark客户端
- 需要被提交的Spark任务的jar(也可以预先提交到HDFS)上
- 要求运行机器拥有hadoop机器的配置文件,yarn的配置文件
- 要求程序指导javahome的路径
packagecom.donny.bigdata.surveillance.components.spark;importcom.donny.bigdata.surveillance.conf.Spark2Config;importcom.donny.bigdata.surveillance.conf.YarnConfig;importorg.apache.commons.lang3.concurrent.BasicThreadFactory;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.yarn.api.records.ApplicationId;importorg.apache.hadoop.yarn.api.records.ApplicationReport;importorg.apache.hadoop.yarn.api.records.FinalApplicationStatus;importorg.apache.hadoop.yarn.client.api.YarnClient;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.apache.spark.launcher.SparkAppHandle;importorg.apache.spark.launcher.SparkLauncher;importjava.io.IOException;importjava.util.HashMap;importjava.util.concurrent.*;/**
* @author [email protected]
* @date 2023/3/30 18:00
*/publicclassSparkOnYarnHelper{privatestaticfinalLogger LOGGER =LogManager.getLogger(SparkOnYarnHelper.class);/**
* 客户端上的hadoop配置文件根目录
*/privatestaticString hadoopConfDir;/**
* 客户端上的yarn配置文件根目录
*/privatestaticString yarnConfDir;/**
* 客户端上的javaHome
*/privatestaticString javaHome;/**
* 客户端上的Spark配置文件根目录
*/privatestaticString sparkHome;/**
* 提交到yarn上的Spark任务的jar的Path
*/privatestaticString appReSourcePath =System.getProperty("user.dir");/**
* Spark任务的jar的入口函数
*/privatestaticString appMainClass;/**
* 运行spark任务的用户
*/publicstaticString hadoopUserName;/**
* 获取yarn上执行结果finalStatus=UNDEFINED时的重试次数
*/privatestaticInteger retryMaxCount;/**
* 初始化环境
*/publicstaticvoidinit(){
hadoopConfDir =Spark2Config.getString("spark2.hadoop_conf_dir","/usr/hdp/2.6.5.0-292/hadoop/conf");
yarnConfDir =Spark2Config.getString("spark2.yarn_conf_dir","/usr/hdp/2.6.5.0-292/hadoop/conf");
javaHome =Spark2Config.getString("spark2.java_home","/data/java");
sparkHome =Spark2Config.getString("spark2.spark_home","/data/hdp/current/spark2-client");
appReSourcePath = appReSourcePath +Spark2Config.getString("spark2.app_resource_path","/plugins/spark2-monitor-1.0.jar");
appMainClass =Spark2Config.getString("spark2.app_main_class","com.donny.base.monitor.Spark2Monitor");
hadoopUserName =Spark2Config.getString("hadoop_user_name","spark");
retryMaxCount =Spark2Config.getInt("spark2.yarn_report_retry",3);if(LOGGER.isDebugEnabled()){
LOGGER.debug("hadoopConfDir=[{}].", hadoopConfDir);
LOGGER.debug("yarnConfDir=[{}].", hadoopConfDir);
LOGGER.debug("javaHome=[{}].", javaHome);
LOGGER.debug("sparkHome=[{}].", sparkHome);
LOGGER.debug("appReSourcePath=[{}].", appReSourcePath);
LOGGER.debug("appMainClass=[{}].", appMainClass);
LOGGER.debug("hadoopUserName=[{}].", hadoopUserName);
LOGGER.debug("retryMaxCount[{}].", retryMaxCount);}}/**
* 将任务提交到yarn
*
* @param hiveTableName Spark任务中使用的hive表的表名
* @return 执行结果字符串
*/publicstaticStringsubmitJobToYarn(String hiveTableName){String result;
LOGGER.info(" Spark2 job is starting... ");HashMap<String,String> env =newHashMap<>(4);
env.put("HADOOP_CONF_DIR", hadoopConfDir);
env.put("JAVA_HOME", javaHome);
env.put("YARN_CONF_DIR", yarnConfDir);
env.put("HADOOP_USER_NAME", hadoopUserName);// 控制SparkAppHandle监听,直达状态isFinalCountDownLatch countDownLatch =newCountDownLatch(1);SparkAppHandle handle =null;try{
handle =newSparkLauncher(env).setSparkHome(sparkHome).setAppResource(appReSourcePath).setMainClass(appMainClass).setMaster("yarn").setDeployMode("client").setConf(SparkLauncher.DRIVER_MEMORY,"512M").setConf(SparkLauncher.EXECUTOR_MEMORY,"512M").setConf(SparkLauncher.EXECUTOR_CORES,"1").setConf("spark.default.parallelism","10").addAppArgs(hiveTableName).setVerbose(true).startApplication(newSparkAppHandle.Listener(){@OverridepublicvoidstateChanged(SparkAppHandle handle){if(handle.getState().isFinal()){
countDownLatch.countDown();}
LOGGER.info("SparkApp state: {}.", handle.getState().toString());}@OverridepublicvoidinfoChanged(SparkAppHandle handle){
LOGGER.info("SparkApp infoChanged: {}.", handle.getState().toString());}});}catch(IOException e){
LOGGER.error("SparkLauncher IOException.", e);}
LOGGER.info(" Spark2 job is running... ");try{// 阻塞到spark任务结束
countDownLatch.await();}catch(InterruptedException e){
LOGGER.error("countDownLatch InterruptedException.", e);}
LOGGER.info(" Spark2 job is over. ");if(null!= handle){
result =getJobResult(handle.getAppId());}else{
result =" Spark2 Execution Exception.";}return result;}/**
* 获取Yarn上的任务最终结果
*
* @param appId yarn上的应用ID 例如application_1678883677607_0210
* @return Spark2 job's execution result
*/privatestaticStringgetJobResult(String appId){
LOGGER.info(" spark appId is {}.", appId);if(null== appId ||"".equals(appId)){return" Spark2 Execution Exception, ApplicationId is null.";}String result =" Spark2 Execution result Obtaining... ";String[] as = appId.split("_");ApplicationId applicationId =ApplicationId.newInstance(Long.parseLong(as[1]),Integer.parseInt(as[2]));YarnClient client =YarnClient.createYarnClient();Configuration conf =newConfiguration();for(String key :YarnConfig.PROPS.stringPropertyNames()){
conf.set(key,YarnConfig.PROPS.getProperty(key));}
client.init(conf);
client.start();
LOGGER.info(" YarnClient is started.");ApplicationReport applicationReport =null;ScheduledExecutorService yarnReportService =newScheduledThreadPoolExecutor(1,newBasicThreadFactory.Builder().namingPattern("yarn-report-%d").daemon(true).build());int retryCount =0;// 默认重试3次,每次延迟1s执行。while(retryCount < retryMaxCount){ScheduledFuture<ApplicationReport> scheduledFuture = yarnReportService.schedule(newYarnReport(client, applicationId),1,TimeUnit.SECONDS);boolean f =true;while(f){if(scheduledFuture.isDone()){try{
applicationReport = scheduledFuture.get();if(LOGGER.isDebugEnabled()){
LOGGER.debug("YarnReport[FinalApplicationStatus]={}.", applicationReport.getFinalApplicationStatus());}}catch(InterruptedException|ExecutionException e){
yarnReportService.shutdown();
LOGGER.error("YarnReport Exception.", e);}finally{
f =false;}}}if(null!= applicationReport){String finalStatus = applicationReport.getFinalApplicationStatus().toString();if(FinalApplicationStatus.UNDEFINED.toString().equals(finalStatus)){
retryCount++;
result ="Spark2 job's finalStatus is UNDEFINED. ";
LOGGER.info("Spark2 job's finalStatus=UNDEFINED, retryCount=[{}].", retryCount);}else{
result ="Spark2 job's finalStatus is "+ finalStatus +".";break;}}else{
retryCount++;
result ="Spark2 job's execution result is null. ";
LOGGER.info("Spark2 job's finalStatus=null, retryCount=[{}].", retryCount);}}if(!yarnReportService.isShutdown()){
yarnReportService.shutdown();}try{
client.close();}catch(IOException e){
LOGGER.error("YarnClient close IOException.", e);}return result;}staticclassYarnReportimplementsCallable<ApplicationReport>{privatestaticfinalLogger LOGGER =LogManager.getLogger(YarnReport.class);finalYarnClient client;finalApplicationId applicationId;YarnReport(YarnClient client,ApplicationId applicationId){this.applicationId = applicationId;this.client = client;}@OverridepublicApplicationReportcall()throwsException{ApplicationReport report = client.getApplicationReport(applicationId);if(LOGGER.isDebugEnabled()){
LOGGER.debug("report={}", report.toString());}return report;}}}
版权归原作者 顧棟 所有, 如有侵权,请联系我们删除。