XXL-JOB
常见任务调度
单机:Timer、ExectorService、spring@scheduled
分布式:xxl-job、quartz、elastic-job
原生定时任务的先天缺陷
XXL-JOB简介
由调度中心和执行器组成,调度中心提供一个web管理配置任务和执行器,调度中心通过rpc触发执行器
1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;
2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;
3、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
4、故障转移:任务路由策略选择”故障转移”情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。
5、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务
6、一致性:“调度中心”通过DB锁保证集群分布式调度的一致性, 一次任务调度只会触发一次执行
7、邮件报警:任务失败时支持邮件报警,支持配置多邮件地址群发报警邮件;
8、任务进度监控:支持实时监控任务进度;
架构图
集成XXL-JOB配置
xxl-job调度中心配置内容说明:
### 调度中心JDBC链接:链接地址
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root_pwd
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 报警邮箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
[email protected]
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 调度中心通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 调度中心国际化配置 [必填]: 默认为 "zh_CN"/中文简体, 可选范围为 "zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;
xxl.job.i18n=zh_CN
## 调度线程池最大线程配置【必填】
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 调度中心日志表数据保存天数 [必填]:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;
xxl.job.logretentiondays=30
执行器配置:
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
组件:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
阻塞处理策略
1.单机串行(默认)
调度请求进入单机执行器后,调度请求进入队列并以串行方式运行(上一次还没执行完毕,后面的排队等待)
2.丢弃后续调度(推荐)
调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
3.覆盖之前的调度(不推荐)
调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务
超时时间
如果任务执行的时间超过了设置的超时时间,那么任务会被打断,停止执行
路由策略
路由策略,核心就是xxljob 到底去哪台服务器找任务执行
第一个
当选择该策略时,会选择执行器注册地址的第一台机器执行,如果第一台机器出现故障,则调度任务失败。
使用注册地址列表中的第一个address
最后一个
当选择该策略时,会选择执行器注册地址的最后一台机器执行,如果机器出现故障,则调度任务失败。
轮询
当选择该策略时,会按照执行器注册地址轮询分配任务,如果其中一台机器出现故障,调度任务失败,任务不会转移
一致性HASH
分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
当选择该策略时,每个任务按照Hash算法固定选择某一台机器。如果那台机器出现故障,调度任务失败,任务不会转移。
最不经常使用
对每一个使用的地址进行保存次数,之后按照次数进行从小到大的排序,取出次数小的进行使用,再给当前次数加一,一直这样操作
最近最久未使用
将好久不使用的排到最前面
故障转移
当选择该策略时,按照顺序依次进行心跳检测,如果其中一台机器出现故障,则会转移到下一个执行器,若心跳检测成功,会选定为目标执行器并发起调度
忙碌转移
当要执行的机器正忙(无可执行线程/需要排队),则使用下一个address执行。
当选择该策略时,按照顺序依次进行空闲检测,如果其中一台机器出现故障,则会转移到下一个执行器,若空闲检测成功,会选定为目标执行器并发起调度。
分片广播
这个模式下会对所有的执行器广播这个任务,所有的节点都会接收到调用请求,每个节点可以根据总分片数和当前任务的索引进行相关业务处理
@XxlJob("testDemo2")
public ReturnT testDemo2() {
//index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号
int index = XxlJobHelper.getShardIndex();
//total:总分片数,执行器集群的总机器数量
int total = XxlJobHelper.getShardTotal();
List<User> dates = query();
for (User user : dates) {
if (user.getId().hashCode() % total == index) {
log.info("当前分片机器序号:{},处理的数据:{}", index, JSONObject.toJSONString(user));
}
}
//返回参数 可用于查看结果、执行成功数等
XxlJobHelper.handleSuccess("成功");
return ReturnT.SUCCESS;
}
调度过期策略
间隔多长时间查询一次数据库合适?
当有查询出了可调度的定时任务时,如果执行了调度且调度时间小于1s时,就会等待1s再执行下一次循环,如果没有执行调度,则等待5s
一个任务下一次执行的时间已经在数据库了,代码拿出来这个值,加5秒,然后和当前时间的秒数进行比较,如果当前时间的秒数 比 下一次执行的时间加5秒还多,那么就是当前任务过期了,既然过期了,那么就要对这个任务做出判断,到底是立即执行,还是忽略
父子任务
父子任务,是指父任务执行后会自动调用子任务,完成一种类似于链式的调用,在使用时只需要配置子任务的ID即可
效果图:
参数传递
@XxlJob("paramTask")
public ReturnT paramTask() {
XxlJobHelper.log(" paramTask的定时任务开始");
//单个参数
String param = XxlJobHelper.getJobParam();
if (StringUtils.isNotBlank(param)) {
log.info("单个参数:{}", param);
//多个参数
String[] params = param.split(",");
log.info("第一个参数:{}", params[0]);
log.info("第二个参数:{}", params[1]);
}
//返回参数 可用于查看结果、执行成功数等
XxlJobHelper.handleSuccess("成功");
return ReturnT.SUCCESS;
}
日志回调
指执行器在执行任务时可以将执行日志传递给调度中心,即使任务没有执行完成,调度中心也可以看到回调的调度日志内容。更细化的分析任务执行情况
@XxlJob("logbackTask")
public ReturnT logbackTask() throws InterruptedException {
XxlJobHelper.log("当前程序执行到了:{}行", 156);
Thread.sleep(5000);
XxlJobHelper.log("当前程序执行到了:{}行", 200);
Thread.sleep(5000);
XxlJobHelper.log("当前程序执行完成");
XxlJobHelper.handleSuccess("成功");
return ReturnT.SUCCESS;
}
任务的生命周期
xxl-job支持在调用任务时,第一次调用时先执行指定的方法,然后在执行具体的任务,当执行器停止时会执行指定的方法
@XxlJob(value="lifeCycleTask",init = "startInit",destroy = "destroy")
public ReturnT lifeCycleTask() throws InterruptedException {
log.info("==============生命周期演示============");
XxlJobHelper.handleSuccess("成功");
return ReturnT.SUCCESS;
}
public void startInit(){
log.info("第一次调用当前任务时,才会执行,预处理");
}
public void destroy(){
log.info("当执行器停止时才会执行");
}
XXL-JOB缺点
“调度中心”通过DB锁的形式来保证集群分布式调度的一致性,如果耗时较小的短调度任务很多,随着调度中心集群数量增加,会增大DB的压力,数据库的锁竞争会比较厉害,造成数据库压力,性能下降。
版权归原作者 闫凯文+1 所有, 如有侵权,请联系我们删除。