0


Quartz集群并发执行导致重复调度问题

Quartz集群并发执行导致重复调度问题排查

  1. 问题描述
    定时平台在某一时刻触发定时job时,集群中的两个调度器节点同时执行了job,导致该定时任务同一时刻下发给业务应用两次,引发并发重复调用的问题。
    在两台调度机器上查看调度日志,发现第一个机器节点A在12:30:00:074执行了job,第二个机器节点B在12:30:00:079也执行了job,两个节点的调度日志如下:
    2021-10-29 12:30:00,074 INFO [dspQuartzScheduler_Worker-42] com.xxx.dsp.scheduler.jobbean.RemoteHttpJobBean - dsp scheduler start to trigger jobName:financial-core_proxyBankCashRequireHandler
    2021-10-29 12:30:00,079 INFO [dspQuartzScheduler_Worker-36] com.xxx.dsp.scheduler.jobbean.RemoteHttpJobBean - dsp scheduler start to trigger jobName:financial-core_proxyBankCashRequireHandler
    注:项目使用的Quartz版本为2.2.3,且使用JDBC模式存储Job。

  2. Quartz概述
    调度器scheduler,任务调度的控制器,负责定时任务的调度,并且提供任务和触发器的增删改查等api方法
    任务job,是实际被调度的任务,每个任务必须指定具体执行任务的实现类,实现类需要继承QuartzJobBean或者实现org.quartz.Job接口,具体的业务逻辑写在execute方法里面
    触发器trigger,trigger用来定义调度时间的概念,即按什么样时间规则去触发任务。主要几种类型:
    SimpleTrigger:简单触发器,从某个时间开始,每隔多少时间触发,重复多少次。
    CronTrigger:使用cron表达式定义触发的时间规则,如"0 0 0,2,4 1/1 * ? *" 表示每天的0,2,4点触发。
    DailyTimeIntervalTrigger:每天中的一个时间段,每N个时间单元触发,时间单元可以是毫秒,秒,分,小时
    CalendarIntervalTrigger:每N个时间单元触发,时间单元可以是毫秒,秒,分,小时,日,月,年。
    Quartz把触发job,叫做fire。TRIGGER_STATE是当前trigger的状态,PREV_FIRE_TIME是上一次触发时间,NEXT_FIRE_TIME是下一次触发时间,misfire是指这个job在某一时刻要触发,却因为某些原因没有触发的情况。
    Quartz在运行时,会起两类线程(不止两类),一类用于调度job的调度线程(单线程),一类是用于执行job具体业务的工作池。
    Quartz自带的表里面,本文主要涉及以下3张表:
    表名 描述
    QRTZ_TRIGGERS 存储所有的Trigger的详细信息:job名称,trigger类型、TRIGGER_STATE(状态)、NEXT_FIRE_TIME(下次触发时间)
    QRTZ_FIRED_TRIGGERS 记录即将和正在触发执行的triggers信息
    QRTZ_LOCKS 存储行锁的表

  3. 开始排查
    trigger的状态储存在数据库,Quartz支持分布式,所以如果起了多个quartz服务,会有多个调度线程来抢夺触发同一个trigger。mysql在默认情况下执行select 语句,是不上锁的,那么如果同时有1个以上的调度线程抢到同一个trigger,是否会导致这个trigger重复调度呢?我们来看看,Quartz是如何解决这个问题的。
    首先,我们先来看下JobStoreSupport类的executeInNonManagedTXLock()方法:

    /**

    • 执行给定的回调并可选地按需地获取给定的锁
    • 使用非spring事务托管
    • Execute the given callback having optionally acquired the given lock.
    • This uses the non-managed transaction connection.
    • @param lockName The name of the lock to acquire, for example
    • "TRIGGER_ACCESS". If null, then no lock is acquired, but the
    • lockCallback is still executed in a non-managed transaction.

    */
    protected <T> T executeInNonManagedTXLock(
    String lockName,
    TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
    boolean transOwner = false;
    Connection conn = null;
    try {
    if (lockName != null) {
    // If we aren't using db locks, then delay getting DB connection
    // until after acquiring the lock since it isn't needed.
    if (getLockHandler().requiresConnection()) {
    // 获取数据库连接,关闭auto commit,开启事务
    conn = getNonManagedTXConnection();
    }
    // 获取lock
    transOwner = getLockHandler().obtainLock(conn, lockName);
    }

         if (conn == null) {
             // 获取数据库连接,关闭auto commit,开启事务
             conn = getNonManagedTXConnection();
         }
         // 执行txCallback方法,也就是要排查的acquireNextTriggers方法
         final T result = txCallback.execute(conn);
         try {
             // 提交事务
             commitConnection(conn);
         } catch (JobPersistenceException e) {
             // 异常回滚
             rollbackConnection(conn);
             //省略.....
         }
         
         return result;
     } catch (JobPersistenceException e) {
         rollbackConnection(conn);
         throw e;
     } catch (RuntimeException e) {
         rollbackConnection(conn);
         throw new JobPersistenceException("Unexpected runtime exception: "
                 + e.getMessage(), e);
     } finally {
         try {
             // 释放锁
             releaseLock(lockName, transOwner);
         } finally {
             // 关闭连接
             cleanupConnection(conn);
         }
     }
    

    }

也就是说,传入的callback方法在执行的过程中是携带了指定的锁,并开启了事务,注释也提到,lockName就是指定的锁的名字,如果lockName是空的,那么callback方法的执行不在锁的保护下,但依然在事务中。

这意味着,我们使用这个方法,不仅可以保证事务,还可以选择保证callback方法的线程安全。
接下来,我们来看一下executeInNonManagedTXLock(…)中的obtainLock(conn,lockName)方法,即抢锁的过程。这个方法是在Semaphore接口中定义的,Semaphore接口通过锁住线程或者资源,来保护资源不被其他线程修改,由于我们的调度信息是存在数据库的,所以现在查看DBSemaphore.java中obtainLock方法的具体实现:

/**
  * Grants a lock on the identified resource to the calling thread (blocking
  * until it is available).
  * 
  * @return true if the lock was obtained.
  */
 public boolean obtainLock(Connection conn, String lockName)
     throws LockException {

    // 检查threadLocal中,是否存在lock
     if (!isLockOwner(lockName)) {
         
         // 执行sql语句,如果expandedSQL执行失败,则执行expandedInsertSQL
         // SELECT * FROM dsp_qrtz_locks WHERE SCHED_NAME = 'dspQuartzScheduler' AND LOCK_NAME = 'TRIGGER_ACCESS'  FOR UPDATE
         executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
         
         // 将lockName写入threadLocal
         getThreadLocks().add(lockName);

     } 
     // 省略...
     return true;
 }

可以看出,obtainLock方法通过locks表的一个行锁(lockName确定)来保证callback方法的事务和线程安全。拿到锁后,obtainLock方法将lockName写入threadlocal。当然在releaseLock的时候,会将lockName从threadlocal中删除。

总而言之,executeInNonManagedTXLock()方法,保证了在分布式的情况,同一时刻,只有一个线程可以执行这个方法。

3.1 Quartz的调度过程-带锁的

QuartzSchedulerThread是调度线程的具体实现,上图 是这个线程run()方法的主要内容,图中只提到了正常的情况下,也就是流程中没有出现异常的情况下的处理过程。由图可以看出,调度流程主要分为以下三步:

1)拉取待触发trigger并插入点火表:
调度线程会循环阻塞随机时间(一般在26s左右)后一次性拉取距离现在,一定时间窗口内的,一定数量内的,即将触发的trigger信息。那么,时间窗口和数量信息如何确定呢,我们先来看一下,以下几个参数:

idleWaitTime: 默认30s,可通过配置属性org.quartz.scheduler.idleWaitTime设置。
availThreadCount:获取可用(空闲)的工作线程数量,总会大于1,因为该方法会一直阻塞,直到有工作线程空闲下来。
maxBatchSize:一次拉取trigger的最大数量,默认是1,可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount改写
batchTimeWindow:时间窗口调节参数,默认是0,可通过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改写
misfireThreshold: 超过这个时间还未触发的trigger,被认为发生了misfire,默认60s,可通过org.quartz.jobStore.misfireThreshold设置。调度线程一次会拉取NEXT_FIRE_TIME小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个triggers,默认情况下,会拉取未来30s,过去60s之间还未fire的1个trigger。随后将这些triggers的状态由WAITING改为ACQUIRED,并插入fired_triggers表。

2)点火trigger:
首先,我们会检查每个trigger的状态是不是ACQUIRED,如果是,则将点火表的状态改为EXECUTING,然后更新trigger的NEXT_FIRE_TIME,如果这个trigger的NEXT_FIRE_TIME为空,也就是未来不再触发,就将其状态改为COMPLETE。如果trigger不允许并发执行(即Job的实现类标注了@DisallowConcurrentExecution),则将状态变为BLOCKED,否则就将状态改为WAITING

3)发射trigger,丢给工作线程池:
遍历triggers,如果其中某个trigger在第二步出错,即返回值里面有exception或者为null,则会对triggers表和fired_triggers点火表的内容修正,并跳过这个trigger,继续检查下一个。否则,则根据trigger信息实例化JobRunShell(实现了Thread接口),同时依据JOB_CLASS_NAME实例化Job,随后我们将JobRunShell实例丢入工作线程池。

JobRunShell将trigger信息,job信息和执行指令传给triggeredJobComplete()方法来完成最后的数据表更新操作。例如如果job执行过程有异常抛出,就将这个trigger状态变为ERROR,如果是BLOCKED状态,就将其变为WAITING等等,最后从fired_triggers表中删除这个已经执行完成的trigger。注意,这些是在工作线程池异步完成。

3.2 问题排查
说明:financial-core_proxyBankCashRequireHandler这个job执行时间为每天10:30:00、12:30:00、14:30:00,问题发生在12:30:00重复执行了两次

取下案发时间的mysql binlog进行分析,加=====部分的是自己加上的

================================== A-1.获取下次执行时间在30s内的WAITING状态的所有trigger,按触发时间正序排序 ===============================
SELECT TRIGGER_NAME, TRIGGER_GROUP,NEXT_FIRE_TIME,PRIORITY
FROM dsp_qrtz_triggers
WHERE SCHED_NAME = 'dspQuartzScheduler' AND TRIGGER_STATE = 'WAITING' AND NEXT_FIRE_TIME <= '当前时间+30s'
AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME > '当前时间-60s')) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC

================================== A-2.更新trigger状态为ACQUIRED(updateTriggerStateFromOtherState) ===============================

at 783532

#211029 12:30:00 server id 1 end_log_pos 783626 CRC32 0x133d6636 Table_map: dsp.dsp_qrtz_triggers mapped to number 122

at 783626

#211029 12:30:00 server id 1 end_log_pos 784027 CRC32 0xa42d0f38 Update_rows: table id 122 flags: STMT_END_F

UPDATE dsp.dsp_qrtz_triggers

WHERE

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635481800000 =============下次执行时间2021-10-29 12:30:00

@8=1635474600000 =============上次执行时间2021-10-29 10:30:00

@9=5

@10='WAITING'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

SET

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635481800000 =============下次执行时间2021-10-29 12:30:00

@8=1635474600000 =============上次执行时间2021-10-29 10:30:00

@9=5

@10='ACQUIRED'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

================================= A-3.插入点火表,设置初始状态为ACQUIRED(insertFiredTrigger) ============================================

at 784027

#211029 12:30:00 server id 1 end_log_pos 784125 CRC32 0x54c5c2e1 Table_map: dsp.dsp_qrtz_fired_triggers mapped to number 123

at 784125

#211029 12:30:00 server id 1 end_log_pos 784328 CRC32 0x4ce1392b Write_rows: table id 123 flags: STMT_END_F

INSERT INTO dsp.dsp_qrtz_fired_triggers

SET

@1='dspQuartzScheduler'

@2='xmkspt9016073972302471607398585878'

@3='financial-core_proxyBankCashRequireHandler'

@4='DSP_JOB'

@5='xmkspt901607397230247'

@6=1635481800071 =============点火时间2021-10-29 12:30:00

@7=1635481800000 =============计划时间2021-10-29 12:30:00

@8=5

@9='ACQUIRED'

@10=NULL

@11=NULL

@12='0'

@13='0'

#211029 12:30:00 server id 1 end_log_pos 784359 CRC32 0x5afaada7 Xid = 988572818
COMMIT/!/;

================================= A-5.更新trigger点火表状态为EXECUTING(updateFiredTrigger) ============================================

at 784912

#211029 12:30:00 server id 1 end_log_pos 785010 CRC32 0x24147435 Table_map: dsp.dsp_qrtz_fired_triggers mapped to number 123

at 785010

#211029 12:30:00 server id 1 end_log_pos 785436 CRC32 0x9af58178 Update_rows: table id 123 flags: STMT_END_F

UPDATE dsp.dsp_qrtz_fired_triggers

WHERE

@1='dspQuartzScheduler'

@2='xmkspt9016073972302471607398585878'

@3='financial-core_proxyBankCashRequireHandler'

@4='DSP_JOB'

@5='xmkspt901607397230247'

@6=1635481800071

@7=1635481800000

@8=5

@9='ACQUIRED'

@10=NULL

@11=NULL

@12='0'

@13='0'

SET

@1='dspQuartzScheduler'

@2='xmkspt9016073972302471607398585878'

@3='financial-core_proxyBankCashRequireHandler'

@4='DSP_JOB'

@5='xmkspt901607397230247'

@6=1635481800072 =============点火时间2021-10-29 12:30:00

@7=1635481800000 =============计划时间2021-10-29 12:30:00

@8=5

@9='EXECUTING'

@10='financial-core_proxyBankCashRequireHandler'

@11='DSP_JOB'

@12='0'

@13='1'

================================= A-7.更新trigger表中的下次执行时间到16:30:00,状态为WAITING(storeTrigger) =================================

at 785436

#211029 12:30:00 server id 1 end_log_pos 785530 CRC32 0xd82639ed Table_map: dsp.dsp_qrtz_triggers mapped to number 122

at 785530

#211029 12:30:00 server id 1 end_log_pos 785931 CRC32 0xefaa26e8 Update_rows: table id 122 flags: STMT_END_F

UPDATE dsp.dsp_qrtz_triggers

WHERE

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635481800000 =============下次执行时间2021-10-29 12:30:00

@8=1635474600000 =============上次执行时间2021-10-29 10:30:00

@9=5

@10='ACQUIRED'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

SET

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635496200000 =============下次执行时间2021-10-29 16:30:00

@8=1635481800000 =============上次执行时间2021-10-29 12:30:00

@9=5

@10='WAITING'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

at 785931

#211029 12:30:00 server id 1 end_log_pos 785962 CRC32 0xe0bfc163 Xid = 988572823
COMMIT/!/;

================================= 【应该是另外一台机器又获取到了,假设是B节点】B-2.更新trigger状态为ACQUIRED(updateTriggerStateFromOtherState) ========

at 789039

#211029 12:30:00 server id 1 end_log_pos 789133 CRC32 0xca03275d Table_map: dsp.dsp_qrtz_triggers mapped to number 122

at 789133

#211029 12:30:00 server id 1 end_log_pos 789534 CRC32 0xcefe83e9 Update_rows: table id 122 flags: STMT_END_F

UPDATE dsp.dsp_qrtz_triggers

WHERE

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635496200000 =============下次执行时间2021-10-29 16:30:00

@8=1635481800000 =============上次执行时间2021-10-29 12:30:00

@9=5

@10='WAITING'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

SET

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635496200000 =============下次执行时间2021-10-29 16:30:00

@8=1635481800000 =============上次执行时间2021-10-29 12:30:00

@9=5

@10='ACQUIRED'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

================================= B-3.插入点火表,设置初始状态为ACQUIRED(insertFiredTrigger) ================================================

========================================================================================================================================
此处已经产生问题了,根据插入点火表的点火时间来看,如果是有锁的话应该是拿到上一个节点更新trigger表下次执行时间为16:30:00的,但是这里竟然拿到的是12:30:00,
说明集群中第二个节点获取的trigger信息绝对是第一个节点未更新trigger表之前就查询到的数据,
但是集群中第一个调度节点从查询可调度的trigger列表一直到更新trigger表的状态为ACQUIRED(只有WAITING状态的才能被查询出来)都是在锁范围内的,
第二个节点只能等待锁释放,释放后状态和时间都已经更新,就不可能再按照查询sql查到该trigger
所以要么是没加悲观锁,要么是悲观锁失效
========================================================================================================================================

=================================================================================

at 789534

#211029 12:30:00 server id 1 end_log_pos 789632 CRC32 0x3523dd84 Table_map: dsp.dsp_qrtz_fired_triggers mapped to number 123

at 789632

#211029 12:30:00 server id 1 end_log_pos 789835 CRC32 0x1a77fbca Write_rows: table id 123 flags: STMT_END_F

INSERT INTO dsp.dsp_qrtz_fired_triggers

SET

@1='dspQuartzScheduler'

@2='xmkspt9116073974232611607398795998'

@3='financial-core_proxyBankCashRequireHandler'

@4='DSP_JOB'

@5='xmkspt911607397423261'

@6=1635481800074 =============点火时间2021-10-29 12:30:00

@7=1635481800000 =============计划时间2021-10-29 12:30:00

@8=5

@9='ACQUIRED'

@10=NULL

@11=NULL

@12='0'

@13='0'

at 789835

#211029 12:30:00 server id 1 end_log_pos 789866 CRC32 0xfdc2b024 Xid = 988572832
COMMIT/!/;

================================= A-8.第一个节点插入调度日志表,此时已经调度到应用去了,后续应用返回执行状态后回调更新表状态为调度成功 ================================================

at 791017

#211029 12:30:00 server id 1 end_log_pos 791128 CRC32 0xa8c4c3fe Table_map: dsp.dsp_qrtz_trigger_log mapped to number 124

at 791128

#211029 12:30:00 server id 1 end_log_pos 791287 CRC32 0xf68d7f4d Write_rows: table id 124 flags: STMT_END_F

INSERT INTO dsp.dsp_qrtz_trigger_log

SET

@1=7446503

@2=111

@3='financial-core_proxyBankCashRequireHandler'

@4=NULL

@5='96e206fc-e1ef-4a5c-9212-4c28e67d9a5b'

@6=0

@7=NULL

@8=NULL

@9=NULL

@10=NULL

@11=NULL

@12=NULL

@13=NULL

@14=NULL

@15=NULL

@16=NULL

@17='financial-core'

@18=NULL

@19='2021-10-29 12:30:00'

@20=0

@21=0

at 791287

#211029 12:30:00 server id 1 end_log_pos 791318 CRC32 0x3b593642 Xid = 988572837
COMMIT/!/;
================================= B-5.更新trigger点火表状态为EXECUTING(updateFiredTrigger) ================================================
/!/;

at 803060

#211029 12:30:00 server id 1 end_log_pos 803158 CRC32 0x93a6b488 Table_map: dsp.dsp_qrtz_fired_triggers mapped to number 123

at 803158

#211029 12:30:00 server id 1 end_log_pos 803584 CRC32 0xc96be07b Update_rows: table id 123 flags: STMT_END_F

UPDATE dsp.dsp_qrtz_fired_triggers

WHERE

@1='dspQuartzScheduler'

@2='xmkspt9116073974232611607398795998'

@3='financial-core_proxyBankCashRequireHandler'

@4='DSP_JOB'

@5='xmkspt911607397423261'

@6=1635481800074

@7=1635481800000

@8=5

@9='ACQUIRED'

@10=NULL

@11=NULL

@12='0'

@13='0'

SET

@1='dspQuartzScheduler'

@2='xmkspt9116073974232611607398795998'

@3='financial-core_proxyBankCashRequireHandler'

@4='DSP_JOB'

@5='xmkspt911607397423261'

@6=1635481800077 =============点火时间2021-10-29 12:30:00

@7=1635481800000 =============计划时间2021-10-29 12:30:00

@8=5

@9='EXECUTING'

@10='financial-core_proxyBankCashRequireHandler'

@11='DSP_JOB'

@12='0'

@13='1'

================================= B-7.更新trigger表中的下次执行时间,状态为WAITING(storeTrigger) ================================================

at 803584

#211029 12:30:00 server id 1 end_log_pos 803678 CRC32 0x1c1ea396 Table_map: dsp.dsp_qrtz_triggers mapped to number 122

at 803678

#211029 12:30:00 server id 1 end_log_pos 804079 CRC32 0x001ee96a Update_rows: table id 122 flags: STMT_END_F

UPDATE dsp.dsp_qrtz_triggers

WHERE

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635496200000

@8=1635481800000

@9=5

@10='ACQUIRED'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

SET

@1='dspQuartzScheduler'

@2='financial-core_proxyBankCashRequireHandler'

@3='DSP_JOB'

@4='financial-core_proxyBankCashRequireHandler'

@5='DSP_JOB'

@6=NULL

@7=1635496200000 =============下次执行时间2021-10-29 16:30:00

@8=1635481800000 =============上次执行时间2021-10-29 12:30:00

@9=5

@10='WAITING'

@11='CRON'

@12=1628755276000

@13=0

@14=NULL

@15=2

@16=''

at 804079

#211029 12:30:00 server id 1 end_log_pos 804110 CRC32 0xaeecdfad Xid = 988572859
COMMIT/!/;

================================= B-8.第二个节点插入调度日志表,此时已经调度到应用去了,后续应用返回执行状态后回调更新表状态为调度成功 ======================================

at 809375

#211029 12:30:00 server id 1 end_log_pos 809486 CRC32 0x2dad35c7 Table_map: dsp.dsp_qrtz_trigger_log mapped to number 124

at 809486

#211029 12:30:00 server id 1 end_log_pos 809645 CRC32 0x03de5a4a Write_rows: table id 124 flags: STMT_END_F### INSERT INTO dsp.dsp_qrtz_trigger_log

SET

@1=7446504

@2=111

@3='financial-core_proxyBankCashRequireHandler'

@4=NULL

@5='f8321a44-31d8-45e6-ac34-13a367ad78d7'

@6=0

@7=NULL

@8=NULL

@9=NULL

@10=NULL

@11=NULL

@12=NULL

@13=NULL

@14=NULL

@15=NULL

@16=NULL

@17='financial-core'

@18=NULL

@19='2021-10-29 12:30:00'

@20=0

@21=0

at 809645

#211029 12:30:00 server id 1 end_log_pos 809676 CRC32 0xe5c0f423 Xid = 988572871
COMMIT/!/;

从binlog上看,基本与上面流程图中的步骤是一模一样的。
注意在B-3(A代表集群节点A,B代表集群节点B,3代表上面流程图的执行步骤)阶段已经产生问题了,根据插入点火表的点火时间来看,如果是有锁的话应该是拿到A节点更新trigger表下次执行时间为16:30:00的,但是这里竟然拿到的是12:30:00,
说明集群中B节点获取的trigger信息绝对是A节点未更新trigger表之前就查询到的数据,
但是集群中A调度节点从查询可调度的trigger列表一直到更新trigger表的状态为ACQUIRED(只有WAITING状态的才能被查询出来)都是在锁范围内的,
B节点只能等待锁释放,但是A释放后状态和时间都已经更新,就不可能再按照查询sql查到该trigger。
所以要么是没加悲观锁,要么是悲观锁失效

根据binlog的分析结果,我们再来看代码。
在前文,我们可以看到,Quartz的调度过程中有3次上锁行为,但是acquireNextTriggers方法是可选是否上锁的,为什么称为可选?因为这三个步骤虽然在executeInNonManagedTXLock方法的保护下,但executeInNonManagedTXLock方法可以通过设置传入参数lockName为空,取消上锁。在翻阅代码时,我们看到第一步拉取待触发的trigger时:

/**
  * <p>
  * Get a handle to the next N triggers to be fired, and mark them as 'reserved'
  * by the calling scheduler.
  * </p>
  * 
  * @see #releaseAcquiredTrigger(OperableTrigger)
  */
 @SuppressWarnings("unchecked")
 public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
     throws JobPersistenceException {
     
     String lockName;
     // 判断是否需要上锁
     // maxCount是通过Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())取最小计算出来的,qsRsrcs.getMaxBatchSize()不显式配置的话默认值是1,所以maxCount值是1
     if(isAcquireTriggersWithinLock() || maxCount > 1) { 
         lockName = LOCK_TRIGGER_ACCESS;
     } else {
         lockName = null;
     }
     return executeInNonManagedTXLock(lockName, 
             new TransactionCallback<List<OperableTrigger>>() {
                 public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                     return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                 }
             },
             new TransactionValidator<List<OperableTrigger>>() {
                 
                 // 省略...
             });
 }

acquireNextTriggers在加锁之前对lockName做了一次判断,而非像其他加锁方法一样,例如triggersFired方法默认传入的就是LOCK_TRIGGER_ACCESS:

/**
  * <p>
  * Inform the <code>JobStore</code> that the scheduler is now firing the
  * given <code>Trigger</code> (executing its associated <code>Job</code>),
  * that it had previously acquired (reserved).
  * </p>
  * 
  * @return null if the trigger or its job or calendar no longer exist, or
  *         if the trigger was not successfully put into the 'executing'
  *         state.
  */
 @SuppressWarnings("unchecked")
 public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
     return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
             new TransactionCallback<List<TriggerFiredResult>>() {
                 public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                     // 省略...
                 }
             });

通过调试发现isAcquireTriggersWithinLock()的值是false,因而导致传入的lockName是null,debug调试时,可以清楚的看到这个值的取值:

从上图可以清楚看到,在拉取待触发的trigger时,默认是不上锁。

我再把org.quartz.jobStore.acquireTriggersWithinLock=true设置为true,再调试
发现传入的lockName就是TRIGGER_ACCESS了:

而且在isAcquireTriggersWithinLock方法注释中,官方也说明了,翻译如下:
是否查询和更新以获取触发的触发器,应在获得显式DB锁后执行(以避免
触发器的db行上可能存在的争用条件)这是在1.6.3之前的行为,但对于大多数
数据库来说是没必要的,由于执行的SQL更新的性质,因此,这是一个多余的性能打击

/**
  * Whether or not the query and update to acquire a Trigger for firing
  * should be performed after obtaining an explicit DB lock (to avoid 
  * possible race conditions on the trigger's db row).  This is the
  * behavior prior to Quartz 1.6.3, but is considered unnecessary for most
  * databases (due to the nature of the SQL update that is performed), 
  * and therefore a superfluous performance hit.     
  */
 public boolean isAcquireTriggersWithinLock() {
     return acquireTriggersWithinLock;
 }

说明如果不是显示配置acquireTriggersWithinLock默认是不加锁的。

根据binlog梳理出两个调度节点的执行流程图:

如果这种默认配置有问题,岂不是会频繁发生重复调度的问题?我看网上有的文章说并不是啊,【原因在于Quartz默认采取乐观锁,也就是允许多个线程同时拉取同一个trigger。Quartz在调度流程的第二步fire trigger,如果发现当前trigger的状态不是ACQUIRED,也就是说,这个trigger被其他线程fire了,就会返回null。在调度流程的第三步,如果发现某个trigger第二步的返回值是null,就会跳过第三步,取消fire 】他们说的这都是不对滴,真正原因这样的,只要在第一步不加锁,就会产生多个节点查询到相同数据的可能(这个没毛病吧),假如两个节点都拿到了相同的待触发的数据,紧接着就会更新trigger表状态并插入点火表(这个也没毛病吧),看下更新trigger表状态和插入点火表的代码:

// We now have a acquired trigger, let's add to return list.
// If our trigger was no longer in the expected state, try a new one.
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
if (rowsUpdated <= 0) {
continue; // next trigger
}
nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

如果更新失败则continue,因为是更新同一条数据,且又都是从WAITING状态改为ACQUIRED状态,不管是一前一后执行的还是并发触发数据库行锁执行的,都必然只有一个能更新成功,一个更新失败,所以必然有一个不会继续插入点火表(没毛病吧),这就是为啥没有频繁产生重复调度的原因,那么产生重复调度的就只有一种情况,就是我上面根据binlog日志画出的流程图中的情况,只有在A节点执行完第一步和第二步,数据提交以后,把trigger表的状态改为WAITING之后,B节点再执行第一步中的更新和插入点火表操作才会产生重复调度问题。这才是产生重复调度的真正原因。这种情况就比较少见,但是对于我们这种调度任务非常多且执行频率很快的公司,发生的概率还是很大的。
眼见为实,直接去生产的数据库去查询历史的调度日志,不查不要紧,一查吓一跳,发现重复调度的还不少:

之前重复调度没出现问题,大部分的是应用做了冥等处理或者是一些无状态的业务处理。总之,如果不希望并发重复调度问题的产生请及时按照下面的解决方案进行处理。

  1. 解决方案
    在quartz.properties配置文件加上:org.quartz.jobStore.acquireTriggersWithinLock=true
    这样,在调度流程的第一步,也就是拉取待即将触发的triggers时,是上锁的状态,即不会同时存在多个线程拉取到相同的trigger的情况,也就避免的重复调度的危险。但是会产生性能问题及调度延时问题(其实问题也不大)

在QuartzJobBean的自实现类executeInternal方法中的添加redis锁,保证同一个job在规定的秒内(在各个job参数上配置并获取)不允许重复调度。

各业务应用自己做冥等和规定时间内的唯一校验。类似于方案2,只是处理的地方不一样。

  1. 其他
    我们使用的quartz.properties配置

org.quartz.scheduler.instanceName=dspQuartzScheduler
org.quartz.scheduler.instanceId=AUTO
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=60
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.maxMisfiresToHandleAtATime=1

#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

for cluster

org.quartz.jobStore.tablePrefix=DSP_QRTZ_
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=5000

是否开启悲观锁控制集群中trigger并发

org.quartz.jobStore.acquireTriggersWithinLock=true

再说一遍,Quartz默认是不开启集群中trigger并发锁的,不开启的话就是会产生重复调度问题。

标签: 大数据

本文转载自: https://blog.csdn.net/qq_14928305/article/details/125768228
版权归原作者 *麒 所有, 如有侵权,请联系我们删除。

“Quartz集群并发执行导致重复调度问题”的评论:

还没有评论