0


dolphin上MySQL到hive、seatunnel任务创建

dolphin上MySQL到hive、seatunnel任务创建:

1.先获取所需数量的taskCode

Long taskCode =getClient().opsForProcess().generateTaskCode(projectCode,1).get(0);

2.创建MySqlSource对象,result_table_name、url、user、password、query是必须的

publicclassMySqlSourceextendsSource{privateString url;privateString driver ="com.mysql.cj.jdbc.Driver";privateString user;privateString password;privateInteger connection_check_timeout_sec =100;privateString query;privateString partition_column;privateInteger partition_num;/** @param result_table_name 结果临时表表名 */publicMySqlSource(String result_table_name){super(result_table_name);}}

3.根据需要创建TransformParam,TransformParam中可添加多个Transform,Transform用于对数据源表处理产生结果表供后续transform或sink使用,transform的source_table_name为上一段流程的结果表,result_table_name为transform的结果表

​ CopyTransform:对数据源指定列复制产生额外列

​ FieldMapperTransform:字段映射

​ FilterRowKindTransform:过滤数据行类型,如INSERT、UPDATE_BEFORE等

​ FilterTransform:字段过滤

​ ReplaceTransform:对数据源的某一字段的值按匹配替换

​ SplitTransform:对数据源某一字段按分隔符拆分出新列

​ SQLTransform:对数据源表进行SQL操作,不支持复杂SQL,如join、聚合、like等操作

​ 支持的函数:SQL Functions | Apache SeaTunnel

4.创建HiveSink对象,四个参数都是必须的,dbName为数据注入表所在库库名,tableName为数据注入表的表名,metastoreUri为hive的metastoreserver的地址,source_table_name为前置流程处理的最终结果表表名

publicclassHiveSinkextendsSink{privateString tableName;privateString dbName;/** 例如thrift://192.168.79.51:9083 */privateString metastoreUri;/** @param source_table_name 数据源表名 */publicHiveSink(String source_table_name){super(source_table_name);}}

5.按需创建SeaTunnelTaskEnvParam对象,jobMode默认batch模式,cdc时需要设置为streaming模式,parallelism任务并行数量,checkpointInterval检查点时间间隔,单位ms

publicclassSeaTunnelTaskEnvParam{/** 任务模式 只有 BATCH STREAMING 两种 {@link SeaTunnelJobModConst} */privateString jobMode =SeaTunnelJobModConst.BATCH;privateInteger parallelism =2;/** 检查点时间间隔,单位ms */privateInteger checkpointInterval =5000;}

6.使用SeaTunnelScriptGenerator.generateMysql2Hive(MySqlSource mySqlSource,
HiveSink hiveSink, SeaTunnelTaskEnvParam envParam, TransformParam transformParam)生成rawScript,envParam或transformParam为null时会生成默认的对应对象

7.SeaTunnelTaskGenerator.generateSeaTunnelTask(String rawScript, String startupScript, String deployMode, String jobId)生成SeaTunnelTask对象

* @param rawScript 需要执行的脚本配置
* @param startupScript
* @param deployMode local、cluster
* @param jobId 任务的唯一id,用于保存checkpoint以及任务中断后恢复任务使用
@Data@Accessors(chain =true)publicclassSeaTunnelTaskextendsAbstractTask{privateString rawScript;/**
   * 启动脚本
   * {@link SeaTunnelStartupScriptConst}
   */privateString startupScript =SeaTunnelStartupScriptConst.SEATUNNEL;privateboolean useCustom =true;/**
   * seatunnel部署方式
   * {@link SeaTunnelDeployModeConst}
   */privateString deployMode =SeaTunnelDeployModeConst.CLUSTER;privatefinalList<String> localParams =newLinkedList<>();privateString others;@OverridepublicStringgetTaskType(){return"SEATUNNEL";}}

8.生成默认TaskDefinition对象,或自定义TaskDefinition对象

TaskDefinitionUtils.createDefaultTaskDefinition(taskCode, seaTunnelTask)

9.创建工作流,创建成功则返回工作流信息ProcessDefineResp,失败抛出DolphinException

工作流创建示例:

submit(Long taskCode,TaskDefinition taskDefinition,String processName,String description){ProcessDefineParam pcr =newProcessDefineParam();
  pcr.setName(processName).setLocations(TaskLocationUtils.verticalLocation(taskCode)).setDescription(description).setTenantCode(tenantCode).setTimeout("0").setExecutionType(ProcessDefineParam.EXECUTION_TYPE_PARALLEL).setTaskDefinitionJson(Collections.singletonList(taskDefinition)).setTaskRelationJson(TaskRelationUtils.oneLineRelation(taskCode)).setGlobalParams(null);ProcessDefineResp resp =getClient().opsForProcess().create(projectCode, pcr);}

任务创建示例:

Long taskCode =getClient().opsForProcess().generateTaskCode(projectCode,1).get(0);MySqlSource mySqlSource =newMySqlSource("fake");
mySqlSource.setUrl("jdbc:mysql://192.168.79.100:3306/test?serverTimezone=GMT%2b8").setUser("root").setPassword("root").setQuery("select * from test.test_table").setConnection_check_timeout_sec(100);HiveSink hiveSink =newHiveSink("fake");
hiveSink.setTableName("test158").setDbName("test").setMetastoreUri("thrift://192.168.79.51:9083");String mysql2Hive =SeaTunnelScriptGenerator.generateMysql2Hive(mySqlSource, hiveSink,null,null);SeaTunnelTask seaTunnelTask =SeaTunnelTaskGenerator.generateSeaTunnelTask(mysql2Hive,SeaTunnelStartupScriptConst.SEATUNNEL,SeaTunnelDeployModeConst.CLUSTER,"1145141919840911");TaskDefinition taskDefinition =TaskDefinitionUtils.createDefaultTaskDefinition(taskCode, seaTunnelTask);submit(taskCode, taskDefinition,"test-seatunnel-mysql2hive-task-dag1","test-seatunnel-mysql2hive-task1");

Schedule示例:

testCreate(){ScheduleDefineParam scheduleDefineParam =newScheduleDefineParam();
  scheduleDefineParam
      .setProcessDefinitionCode(WORKFLOW_CODE).setSchedule(newScheduleDefineParam.Schedule().setStartTime("2022-09-18 00:00:00").setEndTime("2023-09-20 00:00:00").setCrontab("0 0 * * * ? *"));ScheduleInfoResp scheduleInfoResp =getClient().opsForSchedule().create(projectCode, scheduleDefineParam);}
标签: mysql hive 数据库

本文转载自: https://blog.csdn.net/Reiben_Fly/article/details/134927777
版权归原作者 至尊灬宝er 所有, 如有侵权,请联系我们删除。

“dolphin上MySQL到hive、seatunnel任务创建”的评论:

还没有评论