0


JAVA代码实现Spark任务的提交

Spark的任务提交可以通过在Spark客户端上调用shell脚本将spark任务提交到yarn上执行。

  1. $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
  2. --master yarn\
  3. --deploy-mode cluster \
  4. --driver-memory 4g \
  5. --executor-memory 2g \
  6. --executor-cores 1\
  7. --queue thequeue \
  8. examples/jars/spark-examples*.jar \10

在某些场景下,无法直接去调用shell脚本或者需要监控任务的执行结果情况。这样的话,尝试通过JAVA语言、SparkLauncher实现Spark任务的提交和执行结果的获取。

以下的例子以Spark On Yarn的模式来设计的。

  1. 要求运行这个类的机器上拥有Spark客户端
  2. 需要被提交的Spark任务的jar(也可以预先提交到HDFS)上
  3. 要求运行机器拥有hadoop机器的配置文件,yarn的配置文件
  4. 要求程序指导javahome的路径
  1. packagecom.donny.bigdata.surveillance.components.spark;importcom.donny.bigdata.surveillance.conf.Spark2Config;importcom.donny.bigdata.surveillance.conf.YarnConfig;importorg.apache.commons.lang3.concurrent.BasicThreadFactory;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.yarn.api.records.ApplicationId;importorg.apache.hadoop.yarn.api.records.ApplicationReport;importorg.apache.hadoop.yarn.api.records.FinalApplicationStatus;importorg.apache.hadoop.yarn.client.api.YarnClient;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.apache.spark.launcher.SparkAppHandle;importorg.apache.spark.launcher.SparkLauncher;importjava.io.IOException;importjava.util.HashMap;importjava.util.concurrent.*;/**
  2. * @author 1792998761@qq.com
  3. * @date 2023/3/30 18:00
  4. */publicclassSparkOnYarnHelper{privatestaticfinalLogger LOGGER =LogManager.getLogger(SparkOnYarnHelper.class);/**
  5. * 客户端上的hadoop配置文件根目录
  6. */privatestaticString hadoopConfDir;/**
  7. * 客户端上的yarn配置文件根目录
  8. */privatestaticString yarnConfDir;/**
  9. * 客户端上的javaHome
  10. */privatestaticString javaHome;/**
  11. * 客户端上的Spark配置文件根目录
  12. */privatestaticString sparkHome;/**
  13. * 提交到yarn上的Spark任务的jar的Path
  14. */privatestaticString appReSourcePath =System.getProperty("user.dir");/**
  15. * Spark任务的jar的入口函数
  16. */privatestaticString appMainClass;/**
  17. * 运行spark任务的用户
  18. */publicstaticString hadoopUserName;/**
  19. * 获取yarn上执行结果finalStatus=UNDEFINED时的重试次数
  20. */privatestaticInteger retryMaxCount;/**
  21. * 初始化环境
  22. */publicstaticvoidinit(){
  23. hadoopConfDir =Spark2Config.getString("spark2.hadoop_conf_dir","/usr/hdp/2.6.5.0-292/hadoop/conf");
  24. yarnConfDir =Spark2Config.getString("spark2.yarn_conf_dir","/usr/hdp/2.6.5.0-292/hadoop/conf");
  25. javaHome =Spark2Config.getString("spark2.java_home","/data/java");
  26. sparkHome =Spark2Config.getString("spark2.spark_home","/data/hdp/current/spark2-client");
  27. appReSourcePath = appReSourcePath +Spark2Config.getString("spark2.app_resource_path","/plugins/spark2-monitor-1.0.jar");
  28. appMainClass =Spark2Config.getString("spark2.app_main_class","com.donny.base.monitor.Spark2Monitor");
  29. hadoopUserName =Spark2Config.getString("hadoop_user_name","spark");
  30. retryMaxCount =Spark2Config.getInt("spark2.yarn_report_retry",3);if(LOGGER.isDebugEnabled()){
  31. LOGGER.debug("hadoopConfDir=[{}].", hadoopConfDir);
  32. LOGGER.debug("yarnConfDir=[{}].", hadoopConfDir);
  33. LOGGER.debug("javaHome=[{}].", javaHome);
  34. LOGGER.debug("sparkHome=[{}].", sparkHome);
  35. LOGGER.debug("appReSourcePath=[{}].", appReSourcePath);
  36. LOGGER.debug("appMainClass=[{}].", appMainClass);
  37. LOGGER.debug("hadoopUserName=[{}].", hadoopUserName);
  38. LOGGER.debug("retryMaxCount[{}].", retryMaxCount);}}/**
  39. * 将任务提交到yarn
  40. *
  41. * @param hiveTableName Spark任务中使用的hive表的表名
  42. * @return 执行结果字符串
  43. */publicstaticStringsubmitJobToYarn(String hiveTableName){String result;
  44. LOGGER.info(" Spark2 job is starting... ");HashMap<String,String> env =newHashMap<>(4);
  45. env.put("HADOOP_CONF_DIR", hadoopConfDir);
  46. env.put("JAVA_HOME", javaHome);
  47. env.put("YARN_CONF_DIR", yarnConfDir);
  48. env.put("HADOOP_USER_NAME", hadoopUserName);// 控制SparkAppHandle监听,直达状态isFinalCountDownLatch countDownLatch =newCountDownLatch(1);SparkAppHandle handle =null;try{
  49. handle =newSparkLauncher(env).setSparkHome(sparkHome).setAppResource(appReSourcePath).setMainClass(appMainClass).setMaster("yarn").setDeployMode("client").setConf(SparkLauncher.DRIVER_MEMORY,"512M").setConf(SparkLauncher.EXECUTOR_MEMORY,"512M").setConf(SparkLauncher.EXECUTOR_CORES,"1").setConf("spark.default.parallelism","10").addAppArgs(hiveTableName).setVerbose(true).startApplication(newSparkAppHandle.Listener(){@OverridepublicvoidstateChanged(SparkAppHandle handle){if(handle.getState().isFinal()){
  50. countDownLatch.countDown();}
  51. LOGGER.info("SparkApp state: {}.", handle.getState().toString());}@OverridepublicvoidinfoChanged(SparkAppHandle handle){
  52. LOGGER.info("SparkApp infoChanged: {}.", handle.getState().toString());}});}catch(IOException e){
  53. LOGGER.error("SparkLauncher IOException.", e);}
  54. LOGGER.info(" Spark2 job is running... ");try{// 阻塞到spark任务结束
  55. countDownLatch.await();}catch(InterruptedException e){
  56. LOGGER.error("countDownLatch InterruptedException.", e);}
  57. LOGGER.info(" Spark2 job is over. ");if(null!= handle){
  58. result =getJobResult(handle.getAppId());}else{
  59. result =" Spark2 Execution Exception.";}return result;}/**
  60. * 获取Yarn上的任务最终结果
  61. *
  62. * @param appId yarn上的应用ID 例如application_1678883677607_0210
  63. * @return Spark2 job's execution result
  64. */privatestaticStringgetJobResult(String appId){
  65. LOGGER.info(" spark appId is {}.", appId);if(null== appId ||"".equals(appId)){return" Spark2 Execution Exception, ApplicationId is null.";}String result =" Spark2 Execution result Obtaining... ";String[] as = appId.split("_");ApplicationId applicationId =ApplicationId.newInstance(Long.parseLong(as[1]),Integer.parseInt(as[2]));YarnClient client =YarnClient.createYarnClient();Configuration conf =newConfiguration();for(String key :YarnConfig.PROPS.stringPropertyNames()){
  66. conf.set(key,YarnConfig.PROPS.getProperty(key));}
  67. client.init(conf);
  68. client.start();
  69. LOGGER.info(" YarnClient is started.");ApplicationReport applicationReport =null;ScheduledExecutorService yarnReportService =newScheduledThreadPoolExecutor(1,newBasicThreadFactory.Builder().namingPattern("yarn-report-%d").daemon(true).build());int retryCount =0;// 默认重试3次,每次延迟1s执行。while(retryCount < retryMaxCount){ScheduledFuture<ApplicationReport> scheduledFuture = yarnReportService.schedule(newYarnReport(client, applicationId),1,TimeUnit.SECONDS);boolean f =true;while(f){if(scheduledFuture.isDone()){try{
  70. applicationReport = scheduledFuture.get();if(LOGGER.isDebugEnabled()){
  71. LOGGER.debug("YarnReport[FinalApplicationStatus]={}.", applicationReport.getFinalApplicationStatus());}}catch(InterruptedException|ExecutionException e){
  72. yarnReportService.shutdown();
  73. LOGGER.error("YarnReport Exception.", e);}finally{
  74. f =false;}}}if(null!= applicationReport){String finalStatus = applicationReport.getFinalApplicationStatus().toString();if(FinalApplicationStatus.UNDEFINED.toString().equals(finalStatus)){
  75. retryCount++;
  76. result ="Spark2 job's finalStatus is UNDEFINED. ";
  77. LOGGER.info("Spark2 job's finalStatus=UNDEFINED, retryCount=[{}].", retryCount);}else{
  78. result ="Spark2 job's finalStatus is "+ finalStatus +".";break;}}else{
  79. retryCount++;
  80. result ="Spark2 job's execution result is null. ";
  81. LOGGER.info("Spark2 job's finalStatus=null, retryCount=[{}].", retryCount);}}if(!yarnReportService.isShutdown()){
  82. yarnReportService.shutdown();}try{
  83. client.close();}catch(IOException e){
  84. LOGGER.error("YarnClient close IOException.", e);}return result;}staticclassYarnReportimplementsCallable<ApplicationReport>{privatestaticfinalLogger LOGGER =LogManager.getLogger(YarnReport.class);finalYarnClient client;finalApplicationId applicationId;YarnReport(YarnClient client,ApplicationId applicationId){this.applicationId = applicationId;this.client = client;}@OverridepublicApplicationReportcall()throwsException{ApplicationReport report = client.getApplicationReport(applicationId);if(LOGGER.isDebugEnabled()){
  85. LOGGER.debug("report={}", report.toString());}return report;}}}
标签: spark java 大数据

本文转载自: https://blog.csdn.net/weixin_43820556/article/details/129865587
版权归原作者 顧棟 所有, 如有侵权,请联系我们删除。

“JAVA代码实现Spark任务的提交”的评论:

还没有评论