一、基于zookeeper的分布式锁
1.1 基于Zookeeper实现分布式锁的原理
1.1.1 分布式锁特性说明
1.1.1.1 特点分析
- 每次只能一个占用锁;
- 可以重复进入锁;
- 只有占用者才可以解锁;
- 获取锁和释放锁都需要原子
- 不能产生死锁
- 尽量满足性能
1.1.1.2 本质
同步互斥,使得处理任务能够一个一个逐步的过临界资源。
1.1.2 Zookeeper 分布式锁实现原理
1.1.2.1 Zookeeper临时顺序节点特性
zookeeper中有一种临时顺序节点,它具有以下特征:
- 时效性,当会话结束,节点将自动被删除
- 顺序性,当多个应用向其注册顺序节点时,每个顺序号将只能被一个应用获取
1.1.2.2 Zookeeper满足分布式锁基本要求
- 因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的 每次只能一个占用锁,因为只有它一个获取到,所以可以实现 重复进入 ,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁
- zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的
- 因为注册的是临时节点,在会话期间内有效,所以不会产生死锁
- zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能
1.1.2.3 Watcher机制
Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事
件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然
后客户端根据 Watcher 通知状态和事件类型做出业务上的改变。
在实现分布式锁的时候,主要利用这个机制,实现释放锁的时候,通知等待锁的线程竞争锁。
1.1.2.3 总结
综上可知,Zookeeper其实是基于临时顺序节点特性实现的分布式锁。当然,还结合了他的Watcher机制,实现释放锁的时候,通知等待锁的线程去竞争锁。
1.2 分布式锁流程说明
1.2.1 分布式锁流程图
1.2.2 流程说明
- client判断/lock目录是否存在,如果不存在则向其注册/lock的持久节点
- client向/lock/目录下注册/lock/Node-前缀的临时顺序节点,并得到顺序号
- client获取/lock/目录下的所有临时顺序子节点
- client判断临时子节点序号中是否存在比自身的序号小的节点。如果不存在,则获取到锁;如果存在,则对象该临时节点做watch监控
- 获得锁的线程,执行业务逻辑,执行完之后,删除临时节点,完成锁的释放。
- 等待锁的线程如果收到监控的临时节点被删除的通知,则再重复4、5、6步骤,进入下一个获得锁、释放锁的循环。
1.3 分布式锁代码实现
1.3.1 自己手写,实现Lock接口
1.3.1.1 分布式锁ZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 18:13:38
* @description 基于zookeeper实现的分布式锁
*/
public class ZookeeperDistributedLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedLock.class);
// zookeeper 地址
private String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
// zookeeper 锁目录
private String LOCK_PATH = "/LOCK";
// 创建 zookeeper客户端zkClient
private ZkClient client = null;
private CountDownLatch cdl;
// 当前请求的节点前一个节点
private String beforePath;
// 当前请求的节点
private String currentPath;
/**
* 初始化客户端和创建LOCK目录
*
* @param ZOOKEEPER_IP_PORT
* @param LOCK_PATH
*/
public ZookeeperDistributedLock(String ZOOKEEPER_IP_PORT, String LOCK_PATH) {
this.ZOOKEEPER_IP_PORT = ZOOKEEPER_IP_PORT;
this.LOCK_PATH = LOCK_PATH;
client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
// 判断有没有LOCK目录,没有则创建
if (!this.client.exists(LOCK_PATH)) {
this.client.createPersistent(LOCK_PATH);
}
}
@Override
public void lock() {
if (!tryLock()) {
//对次小节点进行监听
waitForLock();
lock();
} else {
logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
// 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
if (currentPath == null || currentPath.length() <= 0) {
// 创建一个临时顺序节点
currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
System.out.println("---------------------------->" + currentPath);
}
// 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
List<String> childrens = this.client.getChildren(LOCK_PATH);
//由小到大排序所有子节点
Collections.sort(childrens);
//判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
return true;
}
//找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
else {
int wz = Collections.binarySearch(childrens, currentPath.substring(6));
beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
//等待锁,对次小节点进行监听
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
if (cdl != null) {
cdl.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
this.client.subscribeDataChanges(beforePath, listener);
if (this.client.exists(beforePath)) {
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath, listener);
}
@Override
public void unlock() {
// 删除当前临时节点
client.delete(currentPath);
}
@Override
public Condition newCondition() {
return null;
}
}
1.3.1.2 模拟下单处理OrderServiceHandle
package com.ningzhaosheng.distributelock.zookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:45:46
* @description 模拟订单处理
*/
public class OrderServiceHandle implements Runnable {
private static OrderCodeGenerator ong = new OrderCodeGenerator();
private Logger logger = LoggerFactory.getLogger(OrderServiceHandle.class);
// 按照线程数初始化倒计数器,倒计数器
private CountDownLatch cdl = null;
private Lock lock = null;
public OrderServiceHandle(CountDownLatch cdl, Lock lock) {
this.cdl = cdl;
this.lock = lock;
}
// 创建订单
public void createOrder() {
String orderCode = null;
//准备获取锁
lock.lock();
try {
// 获取订单编号
orderCode = ong.getOrderCode();
} catch (Exception e) {
// TODO: handle exception
} finally {
//完成业务逻辑以后释放锁
lock.unlock();
}
// ……业务代码
logger.info("insert into DB使用id:=======================>" + orderCode);
}
@Override
public void run() {
try {
// 等待其他线程初始化
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 创建订单
createOrder();
}
}
1.3.1.3 订单号生成类OrderCodeGenerator
package com.ningzhaosheng.distributelock.zookeeper;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:44:06
* @description 生成订单号
*/
public class OrderCodeGenerator {
// 自增长序列
private static int i = 0;
// 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
return sdf.format(now) + ++i;
}
}
1.3.1.4 分布式锁测试类TestZookeeperDistributedLock
package com.ningzhaosheng.distributelock.zookeeper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:48:28
* @description zookeeper分布式锁测试类
*/
public class TestZookeeperDistributedLock {
public static void main(String[] args) {
// zookeeper 地址
String ZOOKEEPER_IP_PORT = "192.168.31.9:2181";
// zookeeper 锁目录
String LOCK_PATH = "/LOCK";
// 线程并发数
int NUM = 10;
CountDownLatch cdl = new CountDownLatch(NUM);
for (int i = 1; i <= NUM; i++) {
// 按照线程数迭代实例化线程
Lock lock = new ZookeeperDistributedLock(ZOOKEEPER_IP_PORT, LOCK_PATH);
new Thread(new OrderServiceHandle(cdl, lock)).start();
// 创建一个线程,倒计数器减1
cdl.countDown();
}
}
}
1.3.1.5 测试效果
从上图执行结果中可以看出,在多线程情况下,分布式锁获取和释放正常。
1.3.2 基于Apache Curator 框架调用
1.3.2.1 maven依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
1.3.2.2 代码实现
这里模拟业务使用分布式锁,还是使用的OrderServiceHandle类,这里只给出分布式锁实现类和测试类,不再给出OrderServiceHandle代码,可以参考上一小节的OrderServiceHandle类。
1.3.2.2.1 分布式锁类CuratorDistributeLock
package com.ningzhaosheng.distributelock.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author ningzhaosheng
* @date 2024/4/17 22:03:45
* @description 实现Lock接口(其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装)
*/
public class CuratorDistributeLock implements Lock {
private CuratorFramework client;
private InterProcessMutex mutex;
public CuratorDistributeLock(String connString, String lockPath) {
this(connString, lockPath, new ExponentialBackoffRetry(3000,5));
}
public CuratorDistributeLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {
try {
client = CuratorFrameworkFactory.builder()
.connectString(connString)
.retryPolicy(retryPolicy)
.build();
client.start();
mutex = new InterProcessMutex(client, lockPath);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void lock() {
try {
// 获取锁
mutex.acquire();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
try {
// 释放锁
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
}
1.3.2.2.2 测试类TestCuratorDistributedLock
package com.ningzhaosheng.distributelock.zookeeper.curator;
import com.ningzhaosheng.distributelock.zookeeper.OrderServiceHandle;
import java.util.concurrent.CountDownLatch;
/**
* @author ningzhaosheng
* @date 2024/4/17 21:54:33
* @description 基于 apache curator分布式锁测试类
*/
public class TestCuratorDistributedLock {
private static final String ZK_ADDRESS = "192.168.31.9:2181";
private static final String LOCK_PATH = "/distributed_lock";
public static void main(String[] args) {
int NUM = 10;
CountDownLatch cdl = new CountDownLatch(NUM);
for (int i = 1; i <= NUM; i++) {
// 按照线程数迭代实例化线程
/** 创建CuratorDistributeLock
* 其实可以不用,可以直接使用InterProcessMutex,这里是为了和jvm的Lock锁保持一致,所以做了一层封装
*/
CuratorDistributeLock curatorDistributeLock = new CuratorDistributeLock(ZK_ADDRESS,LOCK_PATH);
new Thread(new OrderServiceHandle(cdl, curatorDistributeLock)).start();
// 创建一个线程,倒计数器减1
cdl.countDown();
}
}
}
1.3.2.3 执行结果
从执行结果可以看出,基于apche curator框架实现zookeeper锁,它也是按照临时顺序节点的顺序获取锁的,每次获得锁的节点都是最小顺序节点,然后等待锁的线程,会基于watcher机制,每次给最小临时顺序节点加回调,监听节点的变更(即释放锁的线程会删除节点),然后再重新判断最小临时顺序节点,最小的获得锁执行,依次循环完成。
好了,本次内容就分享到这,欢迎关注本博主。如果有帮助到大家,欢迎大家点赞+关注+收藏,有疑问也欢迎大家评论留言!
版权归原作者 夜夜流光相皎洁_小宁 所有, 如有侵权,请联系我们删除。