0


XXL-JOB深入浅出

1.背景

由于我们部门分布式任务调动框架使用的是xxl-job,在平时的开发过程中使用的频次也比较多,但是目前使用的模式比较单一,有些小伙伴可能还不清楚其架构及执行原理(希望同事看到别打我),这节将带大家一起认识一下xxl-job。

2.什么是分布式任务调度?

2.1.常见定时任务方案

在介绍分布式任务调度之前,我们先看看实现简单的定时任务有那些方式:

  • Timer和TimerTask: JDK自带的定时任务,可以实现简单的间隔执行任务(在指定时间点执行某一任务,也能定时的周期性执行),无法实现按日历去调度执行任务。
  • ScheduledExecutorService: Java并发包下,基于线程池实现的定时任务类,可以实现循环和延迟任务。
  • QuartZ: 基于Java实现异步任务调度框架,功能丰富,支持cron表达式,支持持久化。
  • Spring Task: Spring 3.0后提供Spring Task实现任务调度,支持cron表达式,相比Quartz功能稍简单,支持注解编程方式。
  • SpringBoot中的Schedule: 通过@EnableScheduling+@Scheduled来实现定时任务,底层使用的是Spring Task

但是这些在单体项目中使用的集中式的定时任务,在分布式场景中会面临一些问题:

  • 业务量大,单机性能瓶颈需要扩展,如何批处理任务,数据如何分治
  • 多台机器部署如何保证定时任务不重复执行
  • 定时任务时间需要可调整,可以暂停
  • 机器发生故障down机,定时任务依然可用,如何实现故障转移
  • 定时任务,执行日志是否可监控

这个时候,就需要分布式任务调度来展现出它可靠的能力了。

2.2.分布式任务调度定义

分布式任务调度是一种具有

多个节点的分布式系统

,它可以使用单个节点或多个节点来执行任务,并且可以在多个机器集群上实现。它的核心目标是

将大任务分解为小任务

,并将这些小任务分发到

不同的机器上去执行

,以实现高效的任务调度和执行。
同时支持以下特性:

  • 并行任务调度: 支持分布式集群部署,多分片执行定时任务
  • 高可用: 某一台宕机,不影响其他实例执行任务
  • 可扩展: 可通过增加集群实例来提高任务的执行效率
  • 可监控: 对所有定时任务进行统一的管理和检测

2.3.常用的分布式任务调度框架有哪些?

  • Xxl-job
  • Elastic-job
  • Saturn

3.本次主题xxl-job

这次的主角是xxl-job,那就先写xxl-job吧,在编写这篇文档的时候,产品经理那邪恶的眼神看着我,仿佛在说,“我有个需求可能要改一下”,于是我颤抖的要将时间留给我的“Hello World”兄弟。所以本次主题我将重点介绍xxl-job的来龙去脉,其他几个框架会进行横向优势对比。如果感兴趣,下节主题我再逐一详解其他几个框架吧。

3.1.什么是xxl-job

XXL-JOB 是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入600多家公司线上产品线,开箱即用。xxl是xxl-job的开发者大众点评的许雪里名称的拼音开头(我没吹,官网就是这么介绍的)。

组成模块:**xxl-job框架主要用于处理分布式的定时任务,其主要由

调度中心

执行器

组成**。

  • 调度模块(调度中心) (1)负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块。 (2)支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

总结:统一管理任务调度平台上的调度任务,负责触发调度执行,并且提供任务管理平台。

  • 执行模块 (执行器) (1)负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效; (2)接收“调度中心”的执行请求、终止请求和日志请求等。

总结:接收调度中心的调度并且执行,可以直接执行也可以集成到项目中。

3.2.架构设计

3.2.1设计思想

(1)将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
(2)将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

因此,“调度”和“任务”两部分可以相互解耦,分开部署,相互分离,两者之间通过RPC进行通信,提高系统整体稳定性和扩展性。

3.2.2.架构图

3.3.执行器注册与发现是怎么玩的?

3.3.1.执行器的注册

在讲解执行器的注册与发现之前,先来看一看一个定时任务是如何被调用的,如下图。

从上图我们可以看出,xxl-job的调用关系有三层,最上层为调度中心,最下层是定时任务方法,调度中心可以调度不同的执行器,执行器再调度归属于自己的定时任务。
而调度中心调度执行器是需要知道执行器的ip和端口的,所以这个时候就需要执行器将自己的服务信息注册到调度中心。
下图是调度中心处理注册请求的过程。

3.3.2.执行器的注销

实际生产环境中我们可能需要将某些任务服务下线,或者有些任务服务因为过载异常导致执行器宕机。这个时候就需要将注册到调度中心的执行器注册信息进行更新或删除,xxl-job提供了2种方式:

  • 主动注销

执行器主动向调度中心发起注销请求,调度中心接收到注销请求将执行器注册信息删除。

  • 被动注销

执行器意外宕机,无法正常向调度中心发起注销请求,则需要调度中心的探活线程来主动发现某个执行器下线,来主动删除执行器注册信息。

3.3.3.调度中心的探活

前面讲到被动注销需要调度中心主动探活来删除下线的执行器注册信息,那么调度中心又是怎么探活的呢?如图:

3.3.4.执行器的注册与发现完整流程

  1. 调度中心在启动后暴露了注册与注销的接口,提供给执行器调用。
  2. 执行器在暴露了调度接口后,将自己的ip和端口信息通过调度中心的注册接口注册到调度中心,同时每30s进行一次心跳,用于更新自己的注册信息。
  3. 当然,在执行器关闭的时候也会调用调度中心暴露的注销接口,将注册信息删除。
  4. 在调度中心接收到执行器的注册和注销请求后,新增或更新注册信息。
  5. 同时调度中心也会启动探活线程检测注册信息,将超过90s未更新的注册信息删除。

3.4.调度中心怎么调用执行器的呢?

我知道,有个小伙伴举手说道:

  1. 我们只需要在调度中心启动一个线程,不断的去扫描任务配置表,然后判断任务是否达到触发时间。
  2. 如果达到了触发时间,调度中心就会使用这个任务对应的执行器配置,获取到执行器的ip和端口,直接发起Http请求。
  3. 等待执行器执行完毕后,返回一个执行结果。

“小伙伴请坐,回答的很棒~”。但是想要在生产环境中运行,还是需要解决很多问题:

  1. 线程中循环扫描任务配置表是否过于频繁
  2. 调度中心集群同时执行一个任务,造成重复调动该如何处理
  3. 任务是否到达了触发时间该如何判断
  4. 当前线程既要做扫描又要做调度,同步请求的阻塞过程影响到了其他任务调度该如何解决
  5. 同步调用执行器,执行器执行任务耗时过长阻塞了调度中心的线程该如何处理

那么XXL-JOB又是怎么解决这些问题的呢?我们带着问题一起来认识一下xxl-job执行定时任务的流程。

3.4.1.调度中心流程

一、 任务扫描
调度中心第一件事就是启动scheduleThread线程来扫描任务配置表,并判断当前任务是否应该触发。

由于调度中心是集群部署的,为了防止调度重复,xxl-job利用mysql的悲观锁,使其他事务无法进入。

select * from xxl_job_lock where lock_name = 'schedule_lock' for update

虽然通过ScheduleThread线程扫描任务配置表,但是频繁的访问数据显然是不合理的。所以xxl-job在扫描任务配置表查询任务时,如果任务列表有值,则表示程序正在正常的处理定时任务,此时就让线程Sleep 0到1秒;如果定时任务列表没有值,很有可能下一秒查询也没有值,则让线程Sleep 4到5秒,以此来减少查询次数。

可能有小伙伴好奇,为什么是4~5秒呢?
因为xxl-job在扫描任务配置的时候是使用

当前时间

来计算的,但是由于程序耗时,io耗时等影响,等实际查询的时候,任务时间点可能就过了。于是xxl-job将时间点扩大,在当前

系统时间+5s

,查询出触发时间<=(当前系统时间+5s)的数据。得到一个最迟触发时间为5s后的任务列表。

二、 任务触发时机
ScheduleThread线程扫描线程获取到任务列表之后,将这些数据分为了三部分:

  1. 已超过5s
  2. 已超过但不足5s
  3. 还未达到触发时间或恰好到达触发时间

三、任务触发
(1)我们知道,大部分情况下,多数任务都是放进了

时间轮


(2)而任务是通过

ringThread

线程来使用时间轮触发的。但是任务的执行是比较耗时的操作,所以xxl-job选择了异步处理,ringThread线程从时间轮获取到任务之后,把请求扔进了线程池triggerPool中处理。
(3)xxl-job中的triggerPool有快慢两种线程池

fastTriggerPool

slowTriggerPool

,主要是做一个线程池的隔离,将执行偏慢的任务放到slowTriggerPool中,避免执行较慢的任务占用过多的资源导致正常的任务也不能快速的调度。
(4)一个任务如果远程调度的时间超过500ms(不是任务执行时间)就可以标记一次慢任务,在10分钟内同一个任务表标记慢10次就会进入到slowTriggerPool中运行了。

3.4.2.执行器流程

(1)执行器对象在获取到调度器传入的参数之后,会根据任务处理器名称获取到对应的

jobHandler


(2)同时xxl-job还提供了三种阻塞策略:

  • 单机串行:前一个任务还没有执行完毕,就等前一个任务执行完再执行当次的任务
  • 丢弃后续调度:前一个任务没有执行完毕,就终止当次任务,并清空任务队列
  • 覆盖之前调度:前一个任务没有执行完毕,就把它停掉,并执行当次任务

(3)通过了阻塞策略判断之后,调度参数会被push到

triggerQueue

中,

jobThread

会从这个队列中获取数据。然后通过 handler.execute(),通过反射来调用实际的定时任务方法。
(4)最终任务在执行完毕之后,被被push到回调队列

callBackQueue

中,

回调线程

会从队列中获取到回调信息,通过callback方法回调调度中心。


3.5.XXL-JOB特性

隔离小伙伴瞥了我一眼说,有耐能你别整三岁的,你整四岁的!

通过上面的调度中心调度流程,以及执行器流程,我们看到使用了大量的异步处理。
所以这些特征体现了xxl-job的另一特性:

  • 全异步化 & 轻量级

(1)全异步化设计:xxl-job系统中业务逻辑在远程执行器执行,触发流程

全异步化设计

。相比直接在调度中心内部执行业务逻辑,极大的降低了调度线程占用时间;
(2)异步调度:调度中心每次任务触发时仅发送一次调度请求,该调度请求首先推送

“异步调度队列”

,然后异步推送给远程执行器
(2)异步执行:执行器会将请求存入

“异步执行队列”

并且立即响应调度中心,异步运行。
(3)轻量级设计:xxl-job调度中心中每个JOB逻辑非常 “轻”,在全异步化的基础上,单个JOB一次运行平均耗时基本在

 “10ms”

之内(基本为一次请求的网络开销);因此,可以保证使用有限的线程支撑大量的JOB并发运行;
(4)得益于上述两点优化,理论上默认配置下的调度中心,

单机能够支撑 5000 

任务并发运行稳定运行;

当然还有其他的特性:

  • 跨语言

XXL-JOB是一个跨语言的任务调度平台,主要体现在如下几个方面:
(1)RESTful API:调度中心与执行器提供语言无关的 RESTful API 服务,第三方任意语言可据此对接调度中心或者实现执行器。
(2)多任务模式:提供Java、Python、PHP……等十来种任务模式,理论上可扩展任意语言任务模式;
(3)基于HTTP的任务Handler: 提供基于HTTP的任务Handler(Bean任务,JobHandler=”httpJobHandler”);业务方只需要提供HTTP链接等相关信息即可,不限制语言、平台;

  • 并行调度

(1)xxl-job调度模块默认采用并行机制,在多线程调度的情况下,调度模块被阻塞的几率很低,大大提高了调度系统的承载量。
(2)xxl-job的不同任务之间并行调度、并行执行。
(3)xxl-job的单个任务,针对多个执行器是并行运行的,针对单个执行器是串行执行的。同时支持任务终止。

  • 分片广播 & 动态分片

(1)“分片广播” 以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。
(2)“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数,执行器获取分片参数进行分片业务处理。

  • 任务HA (故障转移)

(1)执行器如若集群部署,调度中心将会感知到在线的所有执行器,如“127.0.0.1:9997, 127.0.0.1:9998, 127.0.0.1:9999”。
(2)当任务”路由策略”选择”故障转移(FAILOVER)”时,当调度中心每次发起调度请求时,会按照顺序对执行器发出心跳检测请求,第一个检测为存活状态的执行器将会被选定并发送调度请求。

  • 任务依赖

(1)xxl-job 中每个任务都对应有一个任务ID,同时,每个任务支持设置属性“子任务ID”,因此,通过“任务ID”可以匹配任务依赖关系。
(2)当父任务执行结束并且执行成功时,将会根据“子任务ID”匹配子任务依赖,如果匹配到子任务,将会主动触发一次子任务的执行。

  • 均衡调度

调度中心在集群部署时会自动进行任务平均分配,触发组件每次获取与线程池数量(调度中心支持自定义调度线程池大小)相关数量的任务,避免大量任务集中在单个调度中心集群节点;

  • 失败重试

(1)“故障转移”发生在调度阶段,在执行器集群部署时,如果某一台执行器发生故障,该策略支持自动进行Failover切换到一台正常的执行器机器并且完成调度请求流程。
(2)“失败重试”发生在”调度 + 执行”两个阶段,支持通过自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;

  • 任务失败告警

默认提供邮件失败告警,可扩展短信、钉钉等方式。如果需要新增一种告警方式,只需要新增一个实现 “com.xxl.job.admin.core.alarm.JobAlarm” 接口的告警实现即可。

  • 等等…(这里就不列举了)

3.6.XXL-JOB优劣

你说你,说这么多,他就没缺点吗?

一、xxl-job缺点:

(1)中心化设计,传输效率低,中心节点安全风险。
(2) 支持灰度发布但只能手动录入ip,服务重启会导致ip发生变更,ip维护困难。
(3) Bean模式(类形式)每个任务占用一个java类,会造成类浪费;Bean模式(方法形式)必须基于spring容器。
(4) 通过数据库分布式锁,来控制任务不能重复执行。在任务非常多的情况下,有一些性能问题。

二、xxl-job与其他分布式任务调度框架的对比
下面是quartz、elastic-job、xxl-job的对比:
xxl-jobelastic-jobsaturn项目背景大众点评开源当当网开源项目唯品会开源项目社区力量star23.9k/fork 9.9k(最活跃 )star1.6k/fork585star2.2k/fork699任务分片支持支持支持任务监控实时监控窗口监控比较完善管理页面有有有难以程度简单较复杂较复杂缺点通过获取数据库锁的方式,保证集群中执行任务的唯一性,性能不好需要引入zookeeper , mesos, 增加系统复杂度, 学习成本较高不能动态的添加任务,仅能在控制台对任务进行触发,暂停,删除等操作


3.7.XXL-JOB使用篇

别整这些没用的,我就想知道怎么用?

3.7.1.下载源码

我们将源码下载:http://gitee.com/xuxueli0323/xxl-job 用idea打开

源码结构:

  • doc: 文档信息,包括DB的ddl语句
  • xxl-job-admin: 调度中心项目源码
  • xxl-job-core: 公共Jar依赖
  • xxl-job-executor-samples: 执行器,Sample示例项目

3.7.2.部署调度中心

下载源码,启动调度中心,浏览器访问http://127.0.0.1:8080/xxl-job-admin/jobinfo,登录之后运行界面如下:

3.7.3.部署执行器

创建一个springboot项目,在我们的maven项目中引入 xxl-job 相关依赖。

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.4.1-SNAPSHOT</version>
</dependency>

修改application.properties文件,配置调度中心地址,多个地址用逗号隔开。

xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

一、创建任务
xxl-job提供了2种任务运行模式:
(1)Bean模式
(2)GLUE模式

1. Bean模式
Bean模式有类形式和方法形式实现方式

  • Bean模式(类形式) 1.继承IJobHandler抽象类,实现execute方法
@Slf4j
public class BilibiliTestJobHandler extends IJobHandler {
    
    @Override
    public void execute() throws Exception {
        log.info("哔哩哔哩干杯Bean模式(类形式)~~~~");
    }
}
  1. 在XxlJobConfig中xxlJobExecutor方法中手动注册
@Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        // 注册 BEAN模式(类形式)下的 任务类
        XxlJobExecutor.registJobHandler("bilibiliTestJobHandler", new BilibiliTestJobHandler());
        return xxlJobSpringExecutor;
    }
  • Bean模式(方法形式) 使用@XxlJob注解,value自定义jobHandler名称
@Slf4j
@Component
public class BilibiliTestXxlJob {

    @XxlJob("bilibliJobHandler")
    public ReturnT<String> bilibiliJobHandler() {
        String jobParam = XxlJobHelper.getJobParam();
        log.info("{}, 哔哩哔哩干杯Bean模式(方法形式)~~~~", jobParam);
        return ReturnT.SUCCESS;
    }
}

3.7.4.调度中心管理页面操作

第一步:新增执行器

执行器的名称与项目application.properties配置文件appName保持一致:

xxl.job.executor.appname=xxl-job-bilibili

第二步:新建任务
这里创建两个任务,上面创建的Bean模式任务的类形式和方法形式。

  1. 下图的配置为Bean模式类形式的定时任务,配置为每5秒执行一次,jobHandler必须项目中定义的保持一致

看一看执行器控制台的定时任务打印的信息

2.下面为的配置为

Bean模式方法形式

的定时任务,配置为每5秒执行一次,jobHandler必须项目中定义的保持一致

看一看定时任务打印的信息

  • GLUE模式 Glue支持Java、Python、PHP…等十来种任务模式。 1.下面示例添加Glue模式(java)

2.添加好glue任务后,我们在列表点击操作,打开Gule Idea编辑代码

3.我们可以在线编辑代码:

启动任务后,下面是执行的日志信息

xxl-job也提供了日志监控报表,以及调度日志记录:

当然还有其他的配置,比如:

  • 路由策略
  • 过期策略
  • 阻塞处理策略
  • 超时时间
  • 失败次数

这些策略的执行原理又是什么样的,这些下次再分享~

标签: java 分布式

本文转载自: https://blog.csdn.net/qq_38697245/article/details/129843772
版权归原作者 草莓君_ 所有, 如有侵权,请联系我们删除。

“XXL-JOB深入浅出”的评论:

还没有评论