0


Zookeeper之手写一个分布式锁

前言

我之前写了一篇快速上手ZK的文章:https://blog.csdn.net/qq_38974073/article/details/135293106

本篇最要是进一步加深学习ZK,算是一次简单的实践,巩固学习成果。

设计一个分布式锁

对锁的基本要求

  • 可重入:允许同一个应用内的同一个线程重复调用同一个方法;
  • 阻塞:没有拿到锁的线程将进入阻塞。
  • 公平的:先来先得。

实现原理

使用zk作为发号器,每个线程申请锁时会创建一个临时有序节点:

  • 节点编号最小的获得锁,完成业务操作之后删除临时节点;
  • 如果不是最小编号的节点,就监听前一个节点的删除事件,并进入阻塞状态,当触发回调的事件时,唤醒阻塞线程,并重新进行获取锁操作。

锁要求实现的描述:

  • 可重入:对同一个线程,不用重复获取锁,重入计数+1即可;
  • 阻塞:利用CountDownLatch实现,当触发回调时唤醒线程;
  • 公平的:利用zk临时有序节点的特点进行排队,先到先申请锁。

问:申请到锁之后,网络中断怎么办?

  • 临时节点随客户端关闭而被删除

问:如何避免羊群效应?

  • 每个线程只监听前一个节点

关键流程

在这里插入图片描述

关键代码实现

锁的关键方法:

  • 加锁:lock
  • 解锁:unLock
  • 尝试加锁:tryLock
publicbooleanlock(){if(Thread.currentThread().equals(thread)){
        lockCount.incrementAndGet();returntrue;}while(true){if(tryLock()){
            thread =Thread.currentThread();
            lockCount.incrementAndGet();returntrue;}try{await();}catch(Exception e){thrownewRuntimeException(e);}}}publicsynchronizedbooleanunlock(){if(!thread.equals(Thread.currentThread())){returnfalse;}int newLockCount = lockCount.decrementAndGet();if(newLockCount <0){thrownewIllegalMonitorStateException("重入锁计数不可为负数");}// 是否剩余重入次数if(newLockCount !=0){returntrue;}// 到这一步,意味着lockCount已经为0,可以删除临时节点了try{if(client.isNodeExist(properties.getZkPath())){
            client.deleteNode(lockedPathMap.get(thread));}}catch(Exception e){returnfalse;}finally{
        lockedPathMap.remove(thread);
        priorPathMap.remove(thread);}returntrue;}protectedbooleantryLock(){String lockedPath = lockedPathMap.get(Thread.currentThread());if(null== lockedPath ||!client.isNodeExist(lockedPath)){
        lockedPathMap.put(Thread.currentThread(), lockedPath = client.createEphemeralSeqNode(getLockPrefix()));}// 取得加锁的排队编号String lockedShortPath =getShorPath(lockedPath);List<String> waiters =getWaiters();// 如果自己是所有等待锁中的第一个,则获得锁if(checkLocked(waiters, lockedShortPath)){returntrue;}// 当前线程节点是否在排队int index =Collections.binarySearch(waiters, lockedShortPath);if(index <0){thrownewNullPointerException("可能网络抖动,连接断开,临时节点失效");}// waiters最后面的节点写入map,用来监听
    priorPathMap.put(Thread.currentThread(),getLockPrefix()+ waiters.get(index -1));returnfalse;}privatebooleanawait()throwsException{String priorPath = priorPathMap.get(Thread.currentThread());if(null== priorPath){thrownewNullPointerException("prior_path error");}finalCountDownLatch latch =newCountDownLatch(1);// 删除事件Watcher w = watchedEvent ->{// 监测到前一个节点发生变化,接下来就可以唤起等待线程,重新尝试获取锁
        latch.countDown();};try{// 监听前一个节点的删除时间
        client.watcher(w, priorPath);}catch(KeeperException.NoNodeException e){
        e.printStackTrace();returnfalse;}return latch.await(properties.getTimeout(),TimeUnit.MILLISECONDS);}

好了,如果你对这个感兴趣,不妨拉一下完整源码: https://gitee.com/liangshij/zk-lock-demo

源码简要说明

模块说明

  • lsj-zk-lock:核心实现。
  • lsj-zk-lock-spring-boot-starter:整合springboot
  • lsj-zk-lock-test:使用demo

安装

经典三步走:导包、配置、使用

  1. 拉取代码,将lsj-zk-lock、lsj-zk-lock-spring-boot-starter通过 mvn install 命令安装到本地仓库。
  2. 引入依赖:
<dependency><groupId>cn.lsj</groupId><artifactId>lsj-zk-lock-spring-boot-starter</artifactId><version>2.4.2</version></dependency>

配置

  • 配置locks和dataSource:
spring:zk:dataSource:url:"localhost"port:2181locks:-zkPath:"/test/lock"lockName:"countLock"# 获取锁失败时,进入等待的时间,等待结束将重新尝试获取锁timeout:5000-zkPath:"/test2/lock"lockName:"lock"timeout:5000

使用

  • 使用方式1:通过@GlobalLock注解,指定要使用那个lock
@GetMapping("test2")@GlobalLock("countLock")publicStringtest2(){// 业务代码return"";}
  • 使用方式2:通过@Qualifier注解,指定要使用那个lock
@RestControllerpublicclassTestController{int count =0;@Resource@Qualifier("lock")privateReentrantLock lock;@Resource@Qualifier("countLock")privateReentrantLock countLock;@GetMapping("test")publicStringtest(){
        countLock.lock();try{for(int i =0; i <10000; i++){
                count++;}}finally{
            countLock.unlock();}returnString.valueOf(count);}}

本文转载自: https://blog.csdn.net/qq_38974073/article/details/135293504
版权归原作者 微风至夏 所有, 如有侵权,请联系我们删除。

“Zookeeper之手写一个分布式锁”的评论:

还没有评论