前言
我之前写了一篇快速上手ZK的文章:https://blog.csdn.net/qq_38974073/article/details/135293106
本篇最要是进一步加深学习ZK,算是一次简单的实践,巩固学习成果。
设计一个分布式锁
对锁的基本要求
- 可重入:允许同一个应用内的同一个线程重复调用同一个方法;
- 阻塞:没有拿到锁的线程将进入阻塞。
- 公平的:先来先得。
实现原理
使用zk作为发号器,每个线程申请锁时会创建一个临时有序节点:
- 节点编号最小的获得锁,完成业务操作之后删除临时节点;
- 如果不是最小编号的节点,就监听前一个节点的删除事件,并进入阻塞状态,当触发回调的事件时,唤醒阻塞线程,并重新进行获取锁操作。
锁要求实现的描述:
- 可重入:对同一个线程,不用重复获取锁,重入计数+1即可;
- 阻塞:利用
CountDownLatch
实现,当触发回调时唤醒线程; - 公平的:利用zk临时有序节点的特点进行排队,先到先申请锁。
问:申请到锁之后,网络中断怎么办?
- 临时节点随客户端关闭而被删除
问:如何避免羊群效应?
- 每个线程只监听前一个节点
关键流程
关键代码实现
锁的关键方法:
- 加锁:lock
- 解锁:unLock
- 尝试加锁:tryLock
publicbooleanlock(){if(Thread.currentThread().equals(thread)){
lockCount.incrementAndGet();returntrue;}while(true){if(tryLock()){
thread =Thread.currentThread();
lockCount.incrementAndGet();returntrue;}try{await();}catch(Exception e){thrownewRuntimeException(e);}}}publicsynchronizedbooleanunlock(){if(!thread.equals(Thread.currentThread())){returnfalse;}int newLockCount = lockCount.decrementAndGet();if(newLockCount <0){thrownewIllegalMonitorStateException("重入锁计数不可为负数");}// 是否剩余重入次数if(newLockCount !=0){returntrue;}// 到这一步,意味着lockCount已经为0,可以删除临时节点了try{if(client.isNodeExist(properties.getZkPath())){
client.deleteNode(lockedPathMap.get(thread));}}catch(Exception e){returnfalse;}finally{
lockedPathMap.remove(thread);
priorPathMap.remove(thread);}returntrue;}protectedbooleantryLock(){String lockedPath = lockedPathMap.get(Thread.currentThread());if(null== lockedPath ||!client.isNodeExist(lockedPath)){
lockedPathMap.put(Thread.currentThread(), lockedPath = client.createEphemeralSeqNode(getLockPrefix()));}// 取得加锁的排队编号String lockedShortPath =getShorPath(lockedPath);List<String> waiters =getWaiters();// 如果自己是所有等待锁中的第一个,则获得锁if(checkLocked(waiters, lockedShortPath)){returntrue;}// 当前线程节点是否在排队int index =Collections.binarySearch(waiters, lockedShortPath);if(index <0){thrownewNullPointerException("可能网络抖动,连接断开,临时节点失效");}// waiters最后面的节点写入map,用来监听
priorPathMap.put(Thread.currentThread(),getLockPrefix()+ waiters.get(index -1));returnfalse;}privatebooleanawait()throwsException{String priorPath = priorPathMap.get(Thread.currentThread());if(null== priorPath){thrownewNullPointerException("prior_path error");}finalCountDownLatch latch =newCountDownLatch(1);// 删除事件Watcher w = watchedEvent ->{// 监测到前一个节点发生变化,接下来就可以唤起等待线程,重新尝试获取锁
latch.countDown();};try{// 监听前一个节点的删除时间
client.watcher(w, priorPath);}catch(KeeperException.NoNodeException e){
e.printStackTrace();returnfalse;}return latch.await(properties.getTimeout(),TimeUnit.MILLISECONDS);}
好了,如果你对这个感兴趣,不妨拉一下完整源码: https://gitee.com/liangshij/zk-lock-demo
源码简要说明
模块说明
- lsj-zk-lock:核心实现。
- lsj-zk-lock-spring-boot-starter:整合springboot
- lsj-zk-lock-test:使用demo
安装
经典三步走:导包、配置、使用
- 拉取代码,将lsj-zk-lock、lsj-zk-lock-spring-boot-starter通过 mvn install 命令安装到本地仓库。
- 引入依赖:
<dependency><groupId>cn.lsj</groupId><artifactId>lsj-zk-lock-spring-boot-starter</artifactId><version>2.4.2</version></dependency>
配置
- 配置locks和dataSource:
spring:zk:dataSource:url:"localhost"port:2181locks:-zkPath:"/test/lock"lockName:"countLock"# 获取锁失败时,进入等待的时间,等待结束将重新尝试获取锁timeout:5000-zkPath:"/test2/lock"lockName:"lock"timeout:5000
使用
- 使用方式1:通过@GlobalLock注解,指定要使用那个lock
@GetMapping("test2")@GlobalLock("countLock")publicStringtest2(){// 业务代码return"";}
- 使用方式2:通过@Qualifier注解,指定要使用那个lock
@RestControllerpublicclassTestController{int count =0;@Resource@Qualifier("lock")privateReentrantLock lock;@Resource@Qualifier("countLock")privateReentrantLock countLock;@GetMapping("test")publicStringtest(){
countLock.lock();try{for(int i =0; i <10000; i++){
count++;}}finally{
countLock.unlock();}returnString.valueOf(count);}}
版权归原作者 微风至夏 所有, 如有侵权,请联系我们删除。