0


分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)

一、基于zookeeper的分布式锁

1.1 基于Zookeeper实现分布式锁的原理

1.1.1 分布式锁特性说明

1.1.1.1 特点分析
  • 每次只能一个占用锁;
  • 可以重复进入锁;
  • 只有占用者才可以解锁;
  • 获取锁和释放锁都需要原子
  • 不能产生死锁
  • 尽量满足性能
1.1.1.2 本质

同步互斥,使得处理任务能够一个一个逐步的过临界资源。

1.1.2 Zookeeper 分布式锁实现原理

1.1.2.1 Zookeeper临时顺序节点特性

zookeeper中有一种临时顺序节点,它具有以下特征:

  • 时效性,当会话结束,节点将自动被删除
  • 顺序性,当多个应用向其注册顺序节点时,每个顺序号将只能被一个应用获取
1.1.2.2 Zookeeper满足分布式锁基本要求
  1. 因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的 每次只能一个占用锁,因为只有它一个获取到,所以可以实现 重复进入 ,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁
  2. zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的
  3. 因为注册的是临时节点,在会话期间内有效,所以不会产生死锁
  4. zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能
1.1.2.3 Watcher机制

Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事
件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然
后客户端根据 Watcher 通知状态和事件类型做出业务上的改变。

在实现分布式锁的时候,主要利用这个机制,实现释放锁的时候,通知等待锁的线程竞争锁。

1.1.2.3 总结

综上可知,Zookeeper其实是基于临时顺序节点特性实现的分布式锁。当然,还结合了他的Watcher机制,实现释放锁的时候,通知等待锁的线程去竞争锁。

1.2 分布式锁流程说明

1.2.1 分布式锁流程图

1.2.2 流程说明

  1. client判断/lock目录是否存在,如果不存在则向其注册/lock的持久节点
  2. client向/lock/目录下注册/lock/Node-前缀的临时顺序节点,并得到顺序号
  3. client获取/lock/目录下的所有临时顺序子节点
  4. client判断临时子节点序号中是否存在比自身的序号小的节点。如果不存在,则获取到锁;如果存在,则对象该临时节点做watch监控
  5. 获得锁的线程,执行业务逻辑,执行完之后,删除临时节点,完成锁的释放。
  6. 等待锁的线程如果收到监控的临时节点被删除的通知,则再重复4、5、6步骤,进入下一个获得锁、释放锁的循环。

1.3 分布式锁代码实现

1.3.1 自己手写,实现Lock接口

1.3.1.1 分布式锁ZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 18:13:38
 * @description 基于zookeeper实现的分布式锁
 */
public class ZookeeperDistributedLock implements Lock {

    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);

    // zookeeper 地址
    private String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
    // zookeeper 锁目录
    private String LOCK_PATH = "/LOCK";

    // 创建 zookeeper客户端zkClient
    private ZkClient client = null;

    private CountDownLatch cdl;

    // 当前请求的节点前一个节点
    private String beforePath;
    // 当前请求的节点
    private String currentPath;

    /**
     * 初始化客户端和创建LOCK目录
     *
     * @param ZOOKEEPER_IP_PORT
     * @param LOCK_PATH
     */
    public ZookeeperDistributedLock(String ZOOKEEPER_IP_PORT, String LOCK_PATH) {
        this.ZOOKEEPER_IP_PORT = ZOOKEEPER_IP_PORT;
        this.LOCK_PATH = LOCK_PATH;
        client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
        // 判断有没有LOCK目录,没有则创建
        if (!this.client.exists(LOCK_PATH)) {
            this.client.createPersistent(LOCK_PATH);
        }
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            //对次小节点进行监听
            waitForLock();
            lock();
        } else {
            logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        // 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
        if (currentPath == null || currentPath.length() <= 0) {
            // 创建一个临时顺序节点
            currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
            System.out.println("---------------------------->" + currentPath);
        }

        // 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
        List<String> childrens = this.client.getChildren(LOCK_PATH);
        //由小到大排序所有子节点
        Collections.sort(childrens);
        //判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
        if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
            return true;
        }
        //找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
        else {
            int wz = Collections.binarySearch(childrens, currentPath.substring(6));
            beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
        }

        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    //等待锁,对次小节点进行监听
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
                if (cdl != null) {
                    cdl.countDown();
                }
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
        };

        // 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
        this.client.subscribeDataChanges(beforePath, listener);
        if (this.client.exists(beforePath)) {
            cdl = new CountDownLatch(1);
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.client.unsubscribeDataChanges(beforePath, listener);
    }

    @Override
    public void unlock() {
        // 删除当前临时节点
        client.delete(currentPath);
    }

    @Override
    public Condition newCondition() {
        return null;
    }

}
1.3.1.2 模拟下单处理OrderServiceHandle
package com.ningzhaosheng.distributelock.zookeeper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:45:46
 * @description 模拟订单处理
 */
public class OrderServiceHandle implements Runnable {
    private static OrderCodeGenerator ong = new OrderCodeGenerator();

    private Logger logger = LoggerFactory.getLogger(OrderServiceHandle.class);

    // 按照线程数初始化倒计数器,倒计数器
    private CountDownLatch cdl = null;

    private Lock lock = null;

    public OrderServiceHandle(CountDownLatch cdl, Lock lock) {
        this.cdl = cdl;
        this.lock = lock;
    }

    // 创建订单
    public void createOrder() {
        String orderCode = null;

        //准备获取锁
        lock.lock();
        try {
            // 获取订单编号
            orderCode = ong.getOrderCode();
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            //完成业务逻辑以后释放锁
            lock.unlock();
        }

        // ……业务代码

        logger.info("insert into DB使用id:=======================>" + orderCode);
    }

    @Override
    public void run() {
        try {
            // 等待其他线程初始化
            cdl.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // 创建订单
        createOrder();
    }
}
1.3.1.3 订单号生成类OrderCodeGenerator
package com.ningzhaosheng.distributelock.zookeeper;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:44:06
 * @description 生成订单号
 */
public class OrderCodeGenerator {
    // 自增长序列
    private static int i = 0;

    // 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
    public String getOrderCode() {
        Date now = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        return sdf.format(now) + ++i;
    }
}
1.3.1.4 分布式锁测试类TestZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:48:28
 * @description zookeeper分布式锁测试类
 */
public class TestZookeeperDistributedLock {
    public static void main(String[] args) {
        // zookeeper 地址
        String ZOOKEEPER_IP_PORT = "192.168.31.9:2181";
        // zookeeper 锁目录
        String LOCK_PATH = "/LOCK";
        // 线程并发数
        int NUM = 10;

        CountDownLatch cdl = new CountDownLatch(NUM);
        for (int i = 1; i <= NUM; i++) {
            // 按照线程数迭代实例化线程
            Lock lock = new ZookeeperDistributedLock(ZOOKEEPER_IP_PORT, LOCK_PATH);
            new Thread(new OrderServiceHandle(cdl, lock)).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }
}
1.3.1.5 测试效果

从上图执行结果中可以看出,在多线程情况下,分布式锁获取和释放正常。

1.3.2 基于Apache Curator 框架调用

1.3.2.1 maven依赖
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
1.3.2.2 代码实现

这里模拟业务使用分布式锁,还是使用的OrderServiceHandle类,这里只给出分布式锁实现类和测试类,不再给出OrderServiceHandle代码,可以参考上一小节的OrderServiceHandle类。

1.3.2.2.1 分布式锁类CuratorDistributeLock
package com.ningzhaosheng.distributelock.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 22:03:45
 * @description 实现Lock接口(其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装)
 */
public class CuratorDistributeLock implements Lock {

    private CuratorFramework client;
    private InterProcessMutex mutex;

    public CuratorDistributeLock(String connString, String lockPath) {
        this(connString, lockPath, new ExponentialBackoffRetry(3000,5));
    }

    public CuratorDistributeLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {
        try {
            client = CuratorFrameworkFactory.builder()
                    .connectString(connString)
                    .retryPolicy(retryPolicy)
                    .build();
            client.start();

            mutex = new InterProcessMutex(client, lockPath);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lock() {
        try {
            // 获取锁
            mutex.acquire();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        try {
            // 释放锁
            mutex.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
1.3.2.2.2 测试类TestCuratorDistributedLock
package com.ningzhaosheng.distributelock.zookeeper.curator;

import com.ningzhaosheng.distributelock.zookeeper.OrderServiceHandle;

import java.util.concurrent.CountDownLatch;

/**
 * @author ningzhaosheng
 * @date 2024/4/17 21:54:33
 * @description 基于 apache curator分布式锁测试类
 */
public class TestCuratorDistributedLock {
    private static final String ZK_ADDRESS = "192.168.31.9:2181";
    private static final String LOCK_PATH = "/distributed_lock";

    public static void main(String[] args) {

        int NUM = 10;
        CountDownLatch cdl = new CountDownLatch(NUM);
        for (int i = 1; i <= NUM; i++) {
            // 按照线程数迭代实例化线程
            /** 创建CuratorDistributeLock
             * 其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装
             */
            CuratorDistributeLock curatorDistributeLock = new CuratorDistributeLock(ZK_ADDRESS,LOCK_PATH);
            new Thread(new OrderServiceHandle(cdl, curatorDistributeLock)).start();
            // 创建一个线程,倒计数器减1
            cdl.countDown();
        }
    }
}
1.3.2.3 执行结果

从执行结果可以看出,基于apche curator框架实现zookeeper锁,它也是按照临时顺序节点的顺序获取锁的,每次获得锁的节点都是最小顺序节点,然后等待锁的线程,会基于watcher机制,每次给最小临时顺序节点加回调,监听节点的变更(即释放锁的线程会删除节点),然后再重新判断最小临时顺序节点,最小的获得锁执行,依次循环完成。

好了,本次内容就分享到这,欢迎关注本博主。如果有帮助到大家,欢迎大家点赞+关注+收藏,有疑问也欢迎大家评论留言!


本文转载自: https://blog.csdn.net/qq_25409421/article/details/138009914
版权归原作者 夜夜流光相皎洁_小宁 所有, 如有侵权,请联系我们删除。

“分布式锁实现方案-基于zookeeper的分布式锁实现(原理与代码)”的评论:

还没有评论