0


JAVA代码实现Spark任务的提交

Spark的任务提交可以通过在Spark客户端上调用shell脚本将spark任务提交到yarn上执行。

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn\
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1\
    --queue thequeue \
    examples/jars/spark-examples*.jar \10

在某些场景下,无法直接去调用shell脚本或者需要监控任务的执行结果情况。这样的话,尝试通过JAVA语言、SparkLauncher实现Spark任务的提交和执行结果的获取。

以下的例子以Spark On Yarn的模式来设计的。

  1. 要求运行这个类的机器上拥有Spark客户端
  2. 需要被提交的Spark任务的jar(也可以预先提交到HDFS)上
  3. 要求运行机器拥有hadoop机器的配置文件,yarn的配置文件
  4. 要求程序指导javahome的路径
packagecom.donny.bigdata.surveillance.components.spark;importcom.donny.bigdata.surveillance.conf.Spark2Config;importcom.donny.bigdata.surveillance.conf.YarnConfig;importorg.apache.commons.lang3.concurrent.BasicThreadFactory;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.yarn.api.records.ApplicationId;importorg.apache.hadoop.yarn.api.records.ApplicationReport;importorg.apache.hadoop.yarn.api.records.FinalApplicationStatus;importorg.apache.hadoop.yarn.client.api.YarnClient;importorg.apache.logging.log4j.LogManager;importorg.apache.logging.log4j.Logger;importorg.apache.spark.launcher.SparkAppHandle;importorg.apache.spark.launcher.SparkLauncher;importjava.io.IOException;importjava.util.HashMap;importjava.util.concurrent.*;/**
 * @author [email protected]
 * @date 2023/3/30 18:00
 */publicclassSparkOnYarnHelper{privatestaticfinalLogger LOGGER =LogManager.getLogger(SparkOnYarnHelper.class);/**
     * 客户端上的hadoop配置文件根目录
     */privatestaticString hadoopConfDir;/**
     * 客户端上的yarn配置文件根目录
     */privatestaticString yarnConfDir;/**
     * 客户端上的javaHome
     */privatestaticString javaHome;/**
     * 客户端上的Spark配置文件根目录
     */privatestaticString sparkHome;/**
     * 提交到yarn上的Spark任务的jar的Path
     */privatestaticString appReSourcePath =System.getProperty("user.dir");/**
     * Spark任务的jar的入口函数
     */privatestaticString appMainClass;/**
     * 运行spark任务的用户
     */publicstaticString hadoopUserName;/**
     * 获取yarn上执行结果finalStatus=UNDEFINED时的重试次数
     */privatestaticInteger retryMaxCount;/**
     * 初始化环境
     */publicstaticvoidinit(){
        hadoopConfDir =Spark2Config.getString("spark2.hadoop_conf_dir","/usr/hdp/2.6.5.0-292/hadoop/conf");
        yarnConfDir =Spark2Config.getString("spark2.yarn_conf_dir","/usr/hdp/2.6.5.0-292/hadoop/conf");
        javaHome =Spark2Config.getString("spark2.java_home","/data/java");
        sparkHome =Spark2Config.getString("spark2.spark_home","/data/hdp/current/spark2-client");
        appReSourcePath = appReSourcePath +Spark2Config.getString("spark2.app_resource_path","/plugins/spark2-monitor-1.0.jar");
        appMainClass =Spark2Config.getString("spark2.app_main_class","com.donny.base.monitor.Spark2Monitor");
        hadoopUserName =Spark2Config.getString("hadoop_user_name","spark");
        retryMaxCount =Spark2Config.getInt("spark2.yarn_report_retry",3);if(LOGGER.isDebugEnabled()){
            LOGGER.debug("hadoopConfDir=[{}].", hadoopConfDir);
            LOGGER.debug("yarnConfDir=[{}].", hadoopConfDir);
            LOGGER.debug("javaHome=[{}].", javaHome);
            LOGGER.debug("sparkHome=[{}].", sparkHome);
            LOGGER.debug("appReSourcePath=[{}].", appReSourcePath);
            LOGGER.debug("appMainClass=[{}].", appMainClass);
            LOGGER.debug("hadoopUserName=[{}].", hadoopUserName);
            LOGGER.debug("retryMaxCount[{}].", retryMaxCount);}}/**
     * 将任务提交到yarn
     *
     * @param hiveTableName Spark任务中使用的hive表的表名
     * @return 执行结果字符串
     */publicstaticStringsubmitJobToYarn(String hiveTableName){String result;
        LOGGER.info(" Spark2 job is starting... ");HashMap<String,String> env =newHashMap<>(4);
        env.put("HADOOP_CONF_DIR", hadoopConfDir);
        env.put("JAVA_HOME", javaHome);
        env.put("YARN_CONF_DIR", yarnConfDir);
        env.put("HADOOP_USER_NAME", hadoopUserName);// 控制SparkAppHandle监听,直达状态isFinalCountDownLatch countDownLatch =newCountDownLatch(1);SparkAppHandle handle =null;try{
            handle =newSparkLauncher(env).setSparkHome(sparkHome).setAppResource(appReSourcePath).setMainClass(appMainClass).setMaster("yarn").setDeployMode("client").setConf(SparkLauncher.DRIVER_MEMORY,"512M").setConf(SparkLauncher.EXECUTOR_MEMORY,"512M").setConf(SparkLauncher.EXECUTOR_CORES,"1").setConf("spark.default.parallelism","10").addAppArgs(hiveTableName).setVerbose(true).startApplication(newSparkAppHandle.Listener(){@OverridepublicvoidstateChanged(SparkAppHandle handle){if(handle.getState().isFinal()){
                                countDownLatch.countDown();}
                            LOGGER.info("SparkApp state: {}.", handle.getState().toString());}@OverridepublicvoidinfoChanged(SparkAppHandle handle){
                            LOGGER.info("SparkApp infoChanged: {}.", handle.getState().toString());}});}catch(IOException e){
            LOGGER.error("SparkLauncher IOException.", e);}
        LOGGER.info(" Spark2 job is running... ");try{// 阻塞到spark任务结束
            countDownLatch.await();}catch(InterruptedException e){
            LOGGER.error("countDownLatch InterruptedException.", e);}
        LOGGER.info(" Spark2 job is over. ");if(null!= handle){
            result =getJobResult(handle.getAppId());}else{
            result =" Spark2 Execution Exception.";}return result;}/**
     * 获取Yarn上的任务最终结果
     *
     * @param appId yarn上的应用ID 例如application_1678883677607_0210
     * @return Spark2 job's execution result
     */privatestaticStringgetJobResult(String appId){
        LOGGER.info(" spark appId is {}.", appId);if(null== appId ||"".equals(appId)){return" Spark2 Execution Exception, ApplicationId is null.";}String result =" Spark2 Execution result Obtaining... ";String[] as = appId.split("_");ApplicationId applicationId =ApplicationId.newInstance(Long.parseLong(as[1]),Integer.parseInt(as[2]));YarnClient client =YarnClient.createYarnClient();Configuration conf =newConfiguration();for(String key :YarnConfig.PROPS.stringPropertyNames()){
            conf.set(key,YarnConfig.PROPS.getProperty(key));}
        client.init(conf);
        client.start();
        LOGGER.info(" YarnClient is started.");ApplicationReport applicationReport =null;ScheduledExecutorService yarnReportService =newScheduledThreadPoolExecutor(1,newBasicThreadFactory.Builder().namingPattern("yarn-report-%d").daemon(true).build());int retryCount =0;// 默认重试3次,每次延迟1s执行。while(retryCount < retryMaxCount){ScheduledFuture<ApplicationReport> scheduledFuture = yarnReportService.schedule(newYarnReport(client, applicationId),1,TimeUnit.SECONDS);boolean f =true;while(f){if(scheduledFuture.isDone()){try{
                        applicationReport = scheduledFuture.get();if(LOGGER.isDebugEnabled()){
                            LOGGER.debug("YarnReport[FinalApplicationStatus]={}.", applicationReport.getFinalApplicationStatus());}}catch(InterruptedException|ExecutionException e){
                        yarnReportService.shutdown();
                        LOGGER.error("YarnReport Exception.", e);}finally{
                        f =false;}}}if(null!= applicationReport){String finalStatus = applicationReport.getFinalApplicationStatus().toString();if(FinalApplicationStatus.UNDEFINED.toString().equals(finalStatus)){
                    retryCount++;
                    result ="Spark2 job's finalStatus is UNDEFINED. ";
                    LOGGER.info("Spark2 job's finalStatus=UNDEFINED, retryCount=[{}].", retryCount);}else{
                    result ="Spark2 job's finalStatus is "+ finalStatus +".";break;}}else{
                retryCount++;
                result ="Spark2 job's execution result is null. ";
                LOGGER.info("Spark2 job's finalStatus=null, retryCount=[{}].", retryCount);}}if(!yarnReportService.isShutdown()){
            yarnReportService.shutdown();}try{
            client.close();}catch(IOException e){
            LOGGER.error("YarnClient close IOException.", e);}return result;}staticclassYarnReportimplementsCallable<ApplicationReport>{privatestaticfinalLogger LOGGER =LogManager.getLogger(YarnReport.class);finalYarnClient client;finalApplicationId applicationId;YarnReport(YarnClient client,ApplicationId applicationId){this.applicationId = applicationId;this.client = client;}@OverridepublicApplicationReportcall()throwsException{ApplicationReport report = client.getApplicationReport(applicationId);if(LOGGER.isDebugEnabled()){
                LOGGER.debug("report={}", report.toString());}return report;}}}
标签: spark java 大数据

本文转载自: https://blog.csdn.net/weixin_43820556/article/details/129865587
版权归原作者 顧棟 所有, 如有侵权,请联系我们删除。

“JAVA代码实现Spark任务的提交”的评论:

还没有评论