开源地址:https://gitee.com/LIEN321/mydata-blade
详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0
部署文档:用 Docker 部署 MyData v0.7.1
使用手册:MyData 使用手册v0.7.1
交流Q群:430089673
MyData后端结构
MyData的后端由3个子服务组成,分别是
管理服务
、
任务服务
、
业务数据服务
;
- 管理服务:通过项目、数据标准、应用API、环境的管理 配置出同步业务数据的任务;
- 任务服务:根据配置的任务 定时调用应用API和数据服务 实现业务数据的传输和存储;
- 数据服务:封装业务数据的隔离机制和读写操作;
依赖的组件:
- MySQL:存储管理数据;
- Redis:缓存管理数据和任务;
- MongoDB;存储业务数据;
下图从数据流角度 展示3个子服务的关联:
注:开源版本采用单体SpringBoot;
任务服务
配置任务
任务主要包括:项目环境、数据标准、应用API、任务类型、字段映射、任务周期;
- 项目环境:确定应用API的统一前缀地址;
- 数据标准:明确集成的业务数据的数据结构;
- 应用API: 业务数据的传输通道;
- 任务类型:明确数据的传输方向,
提供数据
表示从应用API读取业务员数据、消费数据
表示向应用API发送业务数据; - 字段映射:配置接口响应结构中 与标准数据字段的映射关系;
- 任务周期:定期执行任务的时间间隔,格式为cron表达式;
任务流程
数据集成的任务执行流程如下图:
- 任务服务启动时(即MyData系统启动),查询所有运行状态的任务;
publicclassJobExecutorimplementsApplicationRunner{...@Overridepublicvoidrun(ApplicationArguments args){// 移除已有缓存
jobCache.removeAll();// 查询已启动的任务List<Task> tasks = taskService.listRunningTasks();
log.info("tasks.size() = "+ tasks.size());if(CollUtil.isNotEmpty(tasks)){
tasks.forEach(this::startTask);}}...}
- 根据任务的cron表达式,计算任务的下次执行时间;
/**
* 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间
*
* @param taskInfo 定时任务
*/privatevoidcalculateNextRunTime(TaskInfo taskInfo){Assert.notNull(taskInfo);Assert.notEmpty(taskInfo.getTaskPeriod());Date date = taskInfo.getStartTime();if(taskInfo.getFailCount()>0){
date = taskInfo.getNextRunTime();}CronExpression cronExpression =newCronExpression(taskInfo.getTaskPeriod());Date nextRunTime = cronExpression.getNextValidTimeAfter(date);
taskInfo.setNextRunTime(nextRunTime);}
- 计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入redis缓存;
/**
* 缓存任务
*
* @param taskInfo 任务对象
* @throws IllegalArgumentException 缓存时长无效
*/publicvoidcacheJob(TaskInfo taskInfo)throwsIllegalArgumentException{// 计算任务缓存有效时长long expire =DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(),DateUnit.SECOND);if(expire <=0){thrownewIllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}",DateUtil.format(taskInfo.getStartTime(),DatePattern.NORM_DATETIME_MS_PATTERN),DateUtil.format(taskInfo.getNextRunTime(),DatePattern.NORM_DATETIME_MS_PATTERN)));}
redisUtil.set(CACHE_TASK+ taskInfo.getId(), taskInfo);
redisUtil.set(CACHE_JOB+ taskInfo.getId(), taskInfo.getId(), expire);
taskInfo.appendLog("任务存入redis,缓存时长 {} 秒", expire);}
- 通过监听redis的key失效事件,获得待执行的任务;
publicclassRedisKeyExpiredListenerimplementsMessageListener{privatefinalJobExecutor jobExecutor;@OverridepublicvoidonMessage(Message message,byte[] pattern){String expiredKey = message.toString();if(StrUtil.startWith(expiredKey,JobCache.CACHE_JOB)){String taskId =StrUtil.subSuf(expiredKey,JobCache.CACHE_JOB.length());
jobExecutor.notify(taskId);}}}
- 将任务加入待执行的线程池,随后即可执行
/**
* 任务存入执行队列
*
* @param taskInfo 任务
*/privatevoidexecuteJob(TaskInfo taskInfo){
taskInfo.appendLog("任务存入执行队列");Runnable runnable =newJobThread(taskInfo);getThreadPoolExecutor().execute(runnable);}
- 根据任务类型分别执行
提供数据
和消费数据
流程; 1. 提供数据 1. 调用应用API,获取json格式数据;2. 根据任务中字段映射 解析json为业务数据Map集合;3. 调用数据服务 将业务数据存入MongoDB;
caseMdConstant.DATA_PRODUCER:// 调用api 获取jsonString json =ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据
jobDataService.parseData(taskInfo, json);// 根据条件过滤数据
jobDataFilterService.doFilter(taskInfo);// 保存业务数据
jobDataService.saveTaskData(taskInfo);// 更新环境变量
jobVarService.saveVarValue(taskInfo, json);break;
- 消费数据 1. 根据任务所选数据标准,查询业务数据;2. 再根据字段映射,将业务数据 转为指定的json对象集合;3. 调用应用API,传输json数据;
caseMdConstant.DATA_CONSUMER:List<BizDataFilter> filters = taskInfo.getDataFilters();if(CollUtil.isNotEmpty(filters)){// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件
filters = filters.stream().filter(filter -> filter.getValue()!=null).collect(Collectors.toList());}// 根据过滤条件 查询数据String dataCode = taskInfo.getDataCode();if(StrUtil.isNotEmpty(dataCode)){List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数
jobDataService.convertData(taskInfo);}// 调用api传输数据ApiUtil.write(taskInfo);break;
- 保存任务执行日志;
版权归原作者 LIEN321 所有, 如有侵权,请联系我们删除。