写在文章开头
近期某些原因,需要对一个上古老项目进行重构,在笔者梳理项目细节时发现,该项目在并发场景下使用
ConcurrentHashMap
出现唯一键重复插入到
ConcurrentHashMap
的情况,所以笔者就以这篇文章总结下这个比较细节的错误。
你好,我叫sharkchili,目前还是在一线奋斗的Java开发,经历过很多有意思的项目,也写过很多有意思的文章,是CSDN Java领域的博客专家,也是Java Guide的维护者之一,非常欢迎你关注我的公众号:写代码的SharkChili,这里面会有笔者精心挑选的并发、JVM、MySQL数据库专栏,也有笔者日常分享的硬核技术小文。
功能简介
这个功能主要是基于
quartz
的任务调度,主线程开始执行时,会从
task
表中读取状态是
未开始(0)
和
执行中(1)
的任务,读取到任务之后,先将任务状态存入
ConcurrentHashMap
中,用
key
记录
Task
信息,
value
为true(因为仅仅用到
ConcurrentHashMap
的
key
存
Task
保证任务去重作用,所以
value
可随意设置),再封装成一个定时执行的
job
提交到
quartz
中。
此后,定时执行的
Job
就会根据任务的定时间隔不断根据这些任务的
id
到
send_data
表查询这些任务要发送的数据。
当发送的数据达到
task
的总量时,这个任务的信息就会先从
ConcurrentHashMap
中删除,同时也会根据任务的id将定时任务的
job
从
quartz
中
drop
掉。
这里我们列举出为
task
表封装的类,可以看到顶层设计仅仅涉及
任务id
和任务名称,而任务具体调度详情记录到其成员变量
TaskInfo
中:
@DatapublicclassTask{/**
* 任务号
*/privateInteger id;/**
* 任务名称
*/privateString taskName;/**
* 任务详情
*/privateTaskInfo taskInfo;}
TaskInfo
的封装如下,它记录着任务总量和任务状态:
@DatapublicclassTaskInfo{/**
* 发送数据总量
*/privateInteger totalNum;/**
* 任务状态:0 未开始 1 处理中 2 已完成
*/privateint status;}
事故现象
笔者在调试
quartz
调度时发现,这个项目从数据库中查的相同的
task
居然能够存入
ConcurrentHashMap
中,正是这个原因,导致项目经常因为相同任务重复多份占用内存空间,导致时不时的
OOM
,这一点笔者也调研过之前的做法,即通过日期限制调度的任务数量解决
(很明显这是治标不治本)
。
排查结果
笔者基于上述梳理的流程对线程调度细节进行调试,终于发现
ConcurrentHashMap
重复
key
的原因,还记得我们上述提到的
job
提交步骤吗?每次主线程都会从
task
表读取未开始(状态为0)和运行中(状态为1)的任务,如果这个任务未提交到
quartz
,则说明这个
task
还未执行,则先将其存入
map
并提交到
quartz
中,反之则更新这个
map
的中
Task
对象的
Taskinfo
信息。
问题就出在两次任务调度查询时,任务第一次被查询时任务状态是0即未开始状态,以未开始的状态的任务信息算好
hashcode
直接存入
ConcurrentHashMap
中,再将数据库中对应任务更新为运行中(status为1),随后任务提交就被提交至
quartz
中。
此后,主线程仍在轮询查询未开始和运行中的任务信息,问题就出在这里,因为第一次存入
ConcurrentHashMap
时存入的
task
状态为0。而第二次查询到这个
task
信息时,因为它已经被
quartz
调度,所以我们无需重复提交,只需将
task
再次存入
map
中即可,正是第一次提交
task
时更新
status
这个操作使得
hashcode
计算结果不同,造成同一个
Task
存入
ConcurrentHashMap
两次。
我们这里就给出一段代码示例模拟这种情况,笔者首先创建一个
task
对象,并设置其内部成员
taskinfo
状态为
0(未开始)
,然后将其存入
map
中。
随后我们再用另外两个线程模拟后续主线程读取到这个任务的情况,此时这个任务已经在第一次提交之后被改为运行中了。我们再次将其存入
map
中:
publicstaticvoidmain(String[] args)throwsInterruptedException{ConcurrentHashMap<Task,Boolean> map =newConcurrentHashMap<>();CountDownLatch countDownLatch =newCountDownLatch(2);//创建一个任务对象Task task =newTask();
task.setId(1);
task.setTaskName("任务1");//设置任务状态为0,模拟第一次从数据库查询出来的状态,为未开始TaskInfo taskInfo =newTaskInfo();
taskInfo.setTotalNum(1000);
taskInfo.setStatus(0);
task.setTaskInfo(taskInfo);
map.put(task,true);//模拟读取到未开始的任务时,将任务状态设置为1(运行中) 存入数据库中
log.info("将task 1 更新为运行中存入数据库.....");//模拟主线程后续的调度中读取到task 1 任务状态为运行中for(int i =0; i <2; i++){newThread(()->{TaskInfo info =newTaskInfo();
info.setTotalNum(1000);
info.setStatus(1);
task.setTaskInfo(info);//将task 1再次存入map中
map.put(task,true);
countDownLatch.countDown();},"t"+ i).start();}
countDownLatch.await();
log.info("map size:{}", map.size());}
毫无意外,同一个
task
居然都能够存入
ConcurrentHashMap
中:
对应的输出结果也是2:
22:59:02.241 [main] INFO com.sharkChili.webTemplate.test.Test - map size:2
解决方案
很明显出现这个问题的原因就是因为调度更新任务信息时修改了对象成员变量从而影响了hashcode计算结果,进而导致并发操作变得无效,所以我们必须保证在符合业务需求的情况下确保相同任务的
hashcode
计算结果一致,经过笔者再三确认明确所有
task
的id是唯一的,所以直接重写
hashCode
和
equals
方法,让
Task
对象的计算和比对都通过taskId进行,从原头避免的任务重复插入的问题:
@DatapublicclassTask{//......略@Overridepublicbooleanequals(Object o){if(this== o)returntrue;if(o ==null||getClass()!= o.getClass())returnfalse;Task task =(Task) o;returnObjects.equals(id, task.id);}@OverridepublicinthashCode(){returnObjects.hash(id);}}
小结
文章写的比较连贯,但是排查过程是比较艰辛的,针对这个问题,笔者花费大半天经历了以下几个步骤:
- 流程梳理
- 线程整理
- 数据流规整
- 结合源码猜测定位
- 尝试问题复现
- 研讨业务规则
- 明确修复方案
总的来说,对于这类涉及并发操作的重构,建议梳理清晰的数据流向并结合源码工作流程加以推断分析,最终明确问题风险点直接进行逻辑修复并及时提测。
我是sharkchili,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号:
写代码的SharkChili,同时我的公众号也有我精心整理的并发编程、JVM、MySQL数据库个人专栏导航。
版权归原作者 shark-chili 所有, 如有侵权,请联系我们删除。