** ❃博主首页 :**
「码到三十五」
,同名公众号 :「码到三十五」,wx号 : 「liwu0213」
☠博主专栏 :
<源码解读>
<面试攻关>
♝博主的话 :搬的每块砖,皆为峰峦之基;公众号搜索「码到三十五」关注这个爱发技术干货的coder,一起筑基
场景
一个应用需要支持大量数据的批处理任务,要求:
- 并行处理能力:应用需能够同时处理多个数据块,即实现并行处理。
- 灵活的并发控制:可以灵活调整并行处理的任务数量,以确保资源利用最大化且不过载。
- 均衡负载分配:应将任务均匀分配到不同的服务器节点上,以平衡各节点的负载,避免单点压力过大。
解决思路
因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理。那XXL-JOB 的路由策略「分片广播 & 动态分片」很贴合这种场景」来调度定时任务;
实现DEMO
SpringBoot环境下,我们集成xxl-job来实现上述方案。
SpringBoot如何集成xxl-job查看官网即可,这里不再叙述,下面看下分片调度的代码:
1.xxl-job调度管理页面配置分片调度任务
路由策略选择: 分片广播
2. 编写task代码:
要获取分片总数和当前分片序号,作为参数传给sql语句:
@ResourceprivateOrderDataMapper orderDataMapper;@XxlJob("orderDataStatusTask")publicvoidorderDataStatusTask(){// 计时器Stopwatch timer =Stopwatch.createStarted();// 获取xxl-job的localThread中的总的分片数和当前分片OrderDataParam param =newOrderDataParam();
param.setShardIndex(XxlJobHelper.getShardIndex());
param.setShardTotal(XxlJobHelper.getShardTotal());// 其他参数设置,略了....// 根据分片数拉取当前分片的数据List<OrderData> orderDataList = orderDataMapper.getInitStatusOrder(param);XxlJobHelper.log("获取待处理订单数据:分片号={},数据量={},总分片数={}",XxlJobHelper.getShardIndex(), orderDataList.size(),XxlJobHelper.getShardTotal());if(CollUtil.isEmpty(orderDataList)){return;}// 处理逻辑,略了....XxlJobHelper.log("当前分片({})处理完成,耗时={}秒",XxlJobHelper.getShardIndex(), timer.stop().elapsed(TimeUnit.SECONDS));}
这里服务启动了4个实例,总分片数ShardTotal就是4,每个实例的ShardIndex分别是0,1,2,3
3. mybatis中编写sql语句
根据分片总数和当前分片数据对Id哈希取模, 这里做了两次hash,主要作用是用id最后一位hash方便直接看出数据被哪个分片调度了。
// 获取未处理的订单数据// 根据id末位数取hash后分片拉取<select id="getInitStatusOrder" parameterType="com.xxx.OrderDataParam"
resultType="com.xxx.OrderData">
select id,order_no,customer_code,
from tt_order_data t
where t.status =0
and t.fail_count <![CDATA[<]]> #{retryCount}
and t.update_time <![CDATA[>=]]> #{lastUpdateTime}
and mod(mod(t.id,10), #{shardTotal})= #{shardIndex}
limit 0,200</select>
4.最后看下调度日志
同一次调度任务,4个实例个调度一次,并且拉取到各自部分的数据进行处理:
第3个实例的调度日志:
2024-09-2508:31:40[com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144]----------- xxl-job job execute start ----------------------Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}2024-09-2508:31:40[com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=3,数据量=100,总分片数=42024-09-2508:31:41[com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(3)处理完成,耗时=1秒
2024-09-2508:31:41[com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144]----------- xxl-job job execute end(finish)----------------------Result: handleCode=200, handleMsg =null2024-09-2508:31:41[com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread]----------- xxl-job job callback finish.
第4个实例的调度日志:
2024-09-2508:31:40[com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144]----------- xxl-job job execute start ----------------------Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}2024-09-2508:31:40[com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=4,数据量=80,总分片数=42024-09-2508:31:41[com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(4)处理完成,耗时=1秒
2024-09-2508:31:41[com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144]----------- xxl-job job execute end(finish)----------------------Result: handleCode=200, handleMsg =null2024-09-2508:31:41[com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread]----------- xxl-job job callback finish.
关注公众号[码到三十五]获取更多技术干货 !
版权归原作者 码到三十五 所有, 如有侵权,请联系我们删除。