0


【大数据】DolphinScheduler将上游Task执行结果传递给下游Task

背景

公司的数据开发平台需要用到DolphinScheduler做任务调度,其中一个场景是:上游任务执行结束后,需要将任务执行结果传递给下游任务。

DolphinScheduler肯定是能实现任务之间的传参的,具体的可以看:DolphinScheduler | 文档中心 (apache.org)。

但是官方案例中介绍的任务之间传参是提前在管理台上配置好的,OK,那么问题来了,如何实现任务之间的动态传参呢?比如说我们自定义Task,然后在Task执行结束后将执行结果封装,传递给DAG中的下一个Task。

分析

如果DolphinScheduler官方的案例没有演示如何动态传,我们开发者应该如何去处理这种需求?

我是这么做的:***分析DolphinScheduler内置的Task,总有一个Task是需要传递参数给下游的。我这里盲猜两个,一个是

SqlTask

,一个是

HttpTask

。我的观点是:总不能做完SQL查询,或者做完HTTP请求后就不管结果吧?***

分析HttpTask源码

分析HttpTask源码,直接找到HttpTask的handle方法,DolphinScheduler中,任何Task的具体执行逻辑都在这个handle方法中。

handle

方法分析

@Overridepublicvoidhandle(TaskCallBack taskCallBack)throwsTaskException{long startTime =System.currentTimeMillis();String formatTimeStamp =DateUtils.formatTimeStamp(startTime);String statusCode =null;String body =null;try(CloseableHttpClient client =createHttpClient();CloseableHttpResponse response =sendRequest(client)){
        statusCode =String.valueOf(getStatusCode(response));
        body =getResponseBody(response);
        exitStatusCode =validResponse(body, statusCode);// 看名字应该就能猜到是处理请求结果的addDefaultOutput(body);long costTime =System.currentTimeMillis()- startTime;
        log.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {} milliseconds, statusCode : {}, body : {}, log : {}",
                formatTimeStamp, httpParameters.getUrl(),
                httpParameters.getHttpMethod(), costTime, statusCode, body, output);}catch(Exception e){appendMessage(e.toString());
        exitStatusCode =-1;
        log.error("httpUrl["+ httpParameters.getUrl()+"] connection failed:"+ output, e);thrownewTaskException("Execute http task failed", e);}}

继续看

addDefaultOutput

方法

publicvoidaddDefaultOutput(String response){// put response in output// 创建Property对象Property outputProperty =newProperty();// 设置Prop,也就是设置Key
    outputProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(),"response"));// 设置是入参还是出参,这里是出参,因为是将结果给下游任务
    outputProperty.setDirect(Direct.OUT);// 设置参数类型,VARCHAR表示就是字符串
    outputProperty.setType(DataType.VARCHAR);// 设置Value,就是http请求结果
    outputProperty.setValue(response);// 重点:将Property添加到varPool中
    httpParameters.addPropertyToValPool(outputProperty);}

分析SqlTask源码

handler

方法分析

@Overridepublicvoidhandle(TaskCallBack taskCallBack)throwsTaskException{
    log.info("Full sql parameters: {}", sqlParameters);
    log.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit  {}",
            sqlParameters.getType(),
            sqlParameters.getDatasource(),
            sqlParameters.getSql(),
            sqlParameters.getLocalParams(),
            sqlParameters.getUdfs(),
            sqlParameters.getShowType(),
            sqlParameters.getConnParams(),
            sqlParameters.getVarPool(),
            sqlParameters.getLimit());try{// get datasource
        baseConnectionParam =(BaseConnectionParam)DataSourceUtils.buildConnectionParams(dbType,
                sqlTaskExecutionContext.getConnectionParams());List<String> subSqls =DataSourceProcessorProvider.getDataSourceProcessor(dbType).splitAndRemoveComment(sqlParameters.getSql());// ready to execute SQL and parameter entity MapList<SqlBinds> mainStatementSqlBinds = subSqls
                .stream().map(this::getSqlAndSqlParamsMap).collect(Collectors.toList());List<SqlBinds> preStatementSqlBinds =Optional.ofNullable(sqlParameters.getPreStatements()).orElse(newArrayList<>()).stream().map(this::getSqlAndSqlParamsMap).collect(Collectors.toList());List<SqlBinds> postStatementSqlBinds =Optional.ofNullable(sqlParameters.getPostStatements()).orElse(newArrayList<>()).stream().map(this::getSqlAndSqlParamsMap).collect(Collectors.toList());List<String> createFuncs =createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList());// execute sql task// 这个方法就是处理sql结果的executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);}catch(Exception e){setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
        log.error("sql task error", e);thrownewTaskException("Execute sql task failed", e);}}

所以我们在看下

executeFuncAndSql

方法内部实现

publicvoidexecuteFuncAndSql(List<SqlBinds> mainStatementsBinds,List<SqlBinds> preStatementsBinds,List<SqlBinds> postStatementsBinds,List<String> createFuncs)throwsException{try(Connection connection =DataSourceClientProvider.getAdHocConnection(DbType.valueOf(sqlParameters.getType()),
                            baseConnectionParam)){// create temp functionif(CollectionUtils.isNotEmpty(createFuncs)){createTempFunction(connection, createFuncs);}// pre executeexecuteUpdate(connection, preStatementsBinds,"pre");// main executeString result =null;// decide whether to executeQuery or executeUpdate based on sqlTypeif(sqlParameters.getSqlType()==SqlType.QUERY.ordinal()){// query statements need to be convert to JsonArray and inserted into Alert to send
            result =executeQuery(connection, mainStatementsBinds.get(0),"main");}elseif(sqlParameters.getSqlType()==SqlType.NON_QUERY.ordinal()){// non query statementString updateResult =executeUpdate(connection, mainStatementsBinds,"main");
            result =setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());}// deal out params// 这个方法就是来处理结果的
        sqlParameters.dealOutParam(result);// post executeexecuteUpdate(connection, postStatementsBinds,"post");}catch(Exception e){
        log.error("execute sql error: {}", e.getMessage());throw e;}}

通过

dealOutParam

看具体处理细节

publicvoiddealOutParam(String result){if(CollectionUtils.isEmpty(localParams)){return;}List<Property> outProperty =getOutProperty(localParams);if(CollectionUtils.isEmpty(outProperty)){return;}if(StringUtils.isEmpty(result)){
        varPool =VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));return;}List<Map<String,String>> sqlResult =getListMapByString(result);if(CollectionUtils.isEmpty(sqlResult)){return;}// if sql return more than one lineif(sqlResult.size()>1){Map<String,List<String>> sqlResultFormat =newHashMap<>();// init sqlResultFormatSet<String> keySet = sqlResult.get(0).keySet();for(String key : keySet){
            sqlResultFormat.put(key,newArrayList<>());}for(Map<String,String> info : sqlResult){for(String key : info.keySet()){
                sqlResultFormat.get(key).add(String.valueOf(info.get(key)));}}for(Property info : outProperty){if(info.getType()==DataType.LIST){
                info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));}}}else{// result only one lineMap<String,String> firstRow = sqlResult.get(0);for(Property info : outProperty){
            info.setValue(String.valueOf(firstRow.get(info.getProp())));}}// 本质还是将sql结果处理后保存在varPool中,varPool才是关键所在
    varPool =VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));}

所以,源代码分析到这,我们就知道了:***如果想实现动态传参,那么我们需要将传递的数据封装成

org.apache.dolphinscheduler.plugin.task.api.model.Property

,然后添加到内置集合变量

org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#varPool

中***

具体实现

这里我们不去讨论自定义Task的具体实现步骤,这不是本文的重点。

当我们实现自定义Task后,可以这样编码实现动态传参:

Property outputProperty =newProperty();// 添加我们要传递的数据Key
outputProperty.setProp("xxxxKey"));// OUT
outputProperty.setDirect(Direct.OUT);// 这里传递的数据是什么类型就写什么类型,建议通过json字符串处理数据
outputProperty.setType(DataType.VARCHAR);// 添加我们要传递的数据Key
outputProperty.setValue("xxxxValue");// 这里的xxxxParameters是我们自己自定义的,一般情况下,一个Task对应一个Parameters
xxxxParameters.addPropertyToValPool(outputProperty);

DolphinScheduler内部有将

List<Property> varPool

转换成

Map<String, Property> varParams

的逻辑,然后会将

varParams

与其他的参数合并,最后通过

taskExecutionContext.setPrepareParamsMap(propertyMap) 

将数据设置给

Map<String, Property> prepareParamsMap

总结

关于DolphinScheduler(海豚调度器)是什么,能做什么,怎么使用等等,这里我就不再赘述,大家感兴趣的可以去看看官方文档:DolphinScheduler | 文档中心 (apache.org)

希望通过本篇文章能让各位读者掌握Task之间的动态传参,然后应用在实际工作中。如果本篇文章能给屏幕前的你们或多或少的一些帮助,也是我喜闻乐见的。

如果能帮我点个免费的关注,那就是对我个人的最大的肯定。如果觉得写的还行,分享一下也是我生活的小确幸~

欢迎关注我的公众号

在这里插入图片描述

Peace Guys,我们下篇文章再见。

标签: 大数据 云计算

本文转载自: https://blog.csdn.net/Jeffray1991/article/details/143144060
版权归原作者 FeiMinds 所有, 如有侵权,请联系我们删除。

“【大数据】DolphinScheduler将上游Task执行结果传递给下游Task”的评论:

还没有评论