0


基于Zookeeper实现分布式锁

1:Zookeeper实现分布式锁

Zookeeper是一个分布式协调服务,分布式协调主要是来解决分布式系统中多个应用之间的数据一致性,Zookeeper内部的数据存储方式类似于文件目录形式的存储结构,它的内存结果如下图所示:

2:Zookeeper加锁原理

在Zookeeper中的指定路径下创建创建节点,然后客户端根据当前路径下的节点状态来判断是否加锁成功,如下图一种情况为例,线程1创建节点成功后,线程2再去创建节点就会创建失败

3:Zookeeper节点类型

持久节点:在Zookeeper中创建后会进行持久储存,直到客户端主动删除

临时节点:以客户端会话Session维度创建节点,一旦客户端会话断开,节点就会自动删除

临时/持久顺序节点:在同一个路径下创建的节点会对每个节点按创建先后顺序编号

4:Zookeeper节点监听机制

Zookeeper节点监听机制是指客户端可以在Zookeeper的指定节点路径节点下注册一个监听者,当这个被监听的节点的状态发生变化时会主动通知监听者,这些变化事件可以是节点删除、创建、节点内容变化等,如下图所示:

zookeeper.exists("/watchpath",new Watcher() {
    @Override
    public void process(WatchedEvent event) {
    System.out.println("进入监听器");
    System.out.println("监听路径Path:"+event.getPath());
    System.out.println("监听事件类型EventType:"+event.getType());                
    }            
});    

5:利用临时顺序节点和监听机制来实现分布式锁

实现分布式锁的方式有多种,我们可以使用临时节点和顺序节点这种方案来实现分布式锁:

1:使用临时节点可以在客户端程序崩溃时自动释放锁,避免死锁问题

2:使用顺序节点的好处是,可以利用锁释放的事件监听机制,来实现阻塞监听式的分布式锁

下面将基于这两个特性来实现分布式锁

6:加锁原理

1:首先在Zookeeper上创建临时顺序节点Node01、Node02等

2:第二步客户端拿到加锁路径下所有创建的节点

3:判断自己的序号是否最小,如果最小的话,代表加锁成功,如果不是最小的话,就对前一个节点创建监听器

4:如果前一个节点删除,监听器就会通知客户端来准备重新获取锁

加锁原理和代码入下图所示:

//加锁路径
String lockPath;
//用来阻塞线程
CountDownLatch cc = new CountDownLatch(1);
//创建锁节点的路径
Sting LOCK_ROOT_PATH = "/locks"

//先创建锁
public void createLock(){
    //lockPath = /locks/lock_01 
    lockPath = zkClient.create(LOCK_ROOT_PATH+"/lock_", CreateMode.EPHEMERAL_SEQUENTIAL);
}

//获取锁
public boolean acquireLock(){
    //获取当前加锁路径下所有的节点
    allLocks = zkClient.getChildren("/locks");
    //按节点顺序大小排序
    Collections.sort(allLocks);
    //判断自己是否是第一个节点
    int index = allLocks.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
    //如果是第一个节点,则加锁成功
    if (index == 0) {
        System.out.println(Thread.currentThread().getName() + "获得锁成功, lockPath: " + lockPath);
        return true;
    } else {
        //不是序号最小的节点,则监听前一个节点
        String preLock = allLocks.get(index - 1);
        //创建监听器
        Stat status = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
        // 前一个节点不存在了,则重新获取锁
        if (stat == null) {
            return acquireLock();
        } else { 
            //阻塞当前进程,直到前一个节点释放锁
            System.out.println(" 等待前一个节点锁释放,prelocakPath:"+preLockPath);
            //唤醒当前线程,继续尝试获取锁
            cc.await();
            return acquireLock();
        }
    }
}

private Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
         //监听到前一个节点释放锁,唤醒当前线程
         cc.countDown();
    }
}

7:可重入锁实现

Zookeeper实现可重入分布式锁的机制是在本地维护一个Map记录,因为如果在Zookeeper节点维护数据的话,Zookeeper的写操作是很慢,集群内部需要进行投票同步数据,所以在本地维护一个Map记录来记录当前加锁的次数和加锁状态,在释放锁的时候减少加锁的次数,原理如下图所示:

//利用Map记录线程持有的锁
ConcurrentMap<Thread, LockData> lockMap = Maps.newConcurrentMap();
public Boolean lock(){
    Thread currentThread = Thread.currentThread();
    LockData lockData = lockMap.get(currentThread);
    //LockData不为空则说明已经有锁
    if (lockData != null)    
    {
       //加锁次数加一
       lockData.lockCount.increment();
       return true;
    }
    //没有锁则尝试获取锁
    Boolean lockResult = acquireLock();
    //获取到锁
    if (lockResult)
    {
        LockData newLockData = new LockData(currentThread,1);
        lockMap.put(currentThread, newLockData);
        return true;
    }
    //获取锁失败
    return false;
}

8:解锁原理

解锁的步骤如下:

(1)判断锁是不是自己的

(2)如果是则减少加锁次数

(3)如果加锁次数等于0,则释放锁,删除掉创建的临时节点,下一个监听这个节点的客户端会感知到节点删除事件,从而重新去获取锁

public Boolean releaseLock(){
    LockData lockData = lockMap.get(currentThread);
    //没有锁
    if(lockData == null){
       return false; 
    }
    //有锁则加锁次数减一
    lockCount = lockData.lockCount.decrement();
    if(lockCount > 0){
        return true;
    } 
    //加锁次数为0
    try{
        //删除节点
        zkClient.delete(lockPath);
        //断开连接
        zkClient.close();
    finally{
        //删除加锁记录
        lockMap.remove(currentThread);
    }
    return true;
}

9:开源分布式锁框架

在实际中我们可以使用功能更为成熟的开源分布式锁框架来快速的搭建我们的业务场景,例如开源的Zookeeper框架Curator开源的Redis分布式锁框架Redisson,下面是它们的简单使用案例

//Redisson定义锁
RLock lock = redissonClient.getLock(LOCK_KEY);
lock.lock();
try {
     //执行业务逻辑
}finally {
     lock.unlock();
}

//Curator定义锁
lock = new InterProcessMutex(getCuratorFramework(), lockPath);
//加锁
lock.acquire();
try {
     //执行业务逻辑
}finally {
     //解锁
    lock.release();
}

本文转载自: https://blog.csdn.net/weixin_45125989/article/details/130493038
版权归原作者 Coder_Cooker 所有, 如有侵权,请联系我们删除。

“基于Zookeeper实现分布式锁”的评论:

还没有评论