实现要点
redis分布式锁的问题
redis通常也用来实现分布式锁,但是有一些问题
- 进程需要主动请求redis判断锁是否被释放,会造成服务端的压力以及客户端循环的开销
- 获得锁的进程需要设置过期时间来容错,有产生延迟的风险
- redis主从切换有可能导致锁失效
zookeeper的优势
使用zookeeper实现分布式锁的优势
- 临时节点,如果客户端失活节点被删除,可以通过设置session过期时间来控制删除时间
- 有序节点,严格的单调递增顺序,可以控制并发时的顺序,类似排队
- watch机制,客户端可以监听某个节点的任何事件
- zookeeper集群是几乎高可用的,快速的选主,官方号称200ms内
- zookeeper能够做到对外的统一视图,可线性化,leader节点失效会停止服务
实现思路
客户端创建临时有序节点 /testLock/lock ,然后拿到父目录 /testLock 下的所有节点并排序,判断自己是否是第一个节点,如果是获取锁成功,如果不是,就监听它的前一个节点的删除事件,当监听节点被删除时,获取锁成功。
代码实现
zookeeper集群的搭建和项目搭建细节请参考上一篇文章
ZooKeeper应用---分布式配置更新_Deepdoo的博客-CSDN博客zookeeper API的使用,基于回调和监听的响应式编程https://blog.csdn.net/weixin_44489428/article/details/123637468
ZkUtils文件
package org.example.lock;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkUtils {
public static String root = "/testLock";
public static String path = "/lock";
private static ZooKeeper zk;
private static final String address = "zk01:2181,zk02:2181,zk03:2181,zk04:2181" + root;
private static final CountDownLatch latch = new CountDownLatch(1);
public static ZooKeeper getZk() {
try {
zk = new ZooKeeper(address, 3000, new DefaultWatcher(latch));
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}
}
ZkUtils 作为一个工具类,主要是通过阻塞的方式获取 zookeeper 连接的实例。
address是zookeeper集群的连接地址,指定了本实验的根目录 /testLock,锁节点名称前缀 /lock。
DefaultWatcher是默认的监听类,当连接成功时把 CountDownLatch 减1,返回zk对象。
DefaultWatcher类
package org.example.lock;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
public class DefaultWatcher implements Watcher {
private CountDownLatch latch;
public DefaultWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("sync connected!!!");
latch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
}
连接成功的时候打印 “sync connnected!!!"
TestLock类
package org.example.lock;
import org.apache.zookeeper.ZooKeeper;
/**
* @date 2022/3/22 11:00
*/
public class TestLock {
private static ZooKeeper zk;
public static void main(String[] args) {
zk = ZkUtils.getZk();
System.out.println(zk.toString());
// 用子线程模拟分布式中的多个节点
int childrenSize = 10;
Child[] children = new Child[childrenSize];
for (int i = 0; i < childrenSize; i++) {
children[i] = new Child();
children[i].start();
}
for (int i = 0; i < childrenSize; i++) {
try {
children[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("I am father!");
}
static class Child extends Thread {
@Override
public void run() {
String name = Thread.currentThread().getName();
WatcherCallback watcherCallback = new WatcherCallback(zk, name);
// 加锁
watcherCallback.tryLock();
// 业务逻辑
System.out.println(name + " is working...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 解锁
watcherCallback.unLock();
}
}
}
main方法是测试入口,首先通过 ZkUtils 拿到 zookeeper 连接,这里会阻塞一下,连接成功后会打印zk的连接信息。
创建了10个线程来模拟10个分布式环境中的节点,主线程等待所有子线程结束后打印自己结束前的信息。
子线程通过 WatcherCallback 拿到锁,执行业务逻辑,然后再释放锁。
WatcherCallback类
package org.example.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class WatcherCallback implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
private ZooKeeper zk;
private String threadName;
private String nodeName;
// 加锁时必须要等待锁释放
private CountDownLatch latch = new CountDownLatch(1);
public WatcherCallback(ZooKeeper zk, String threadName) {
this.zk = zk;
this.threadName = threadName;
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
// 前一个节点被删除了 当前节点直接获得锁
latch.countDown();
System.out.println(threadName + " found prev node deleted: " + event.getPath());
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
public void tryLock() {
// 创建临时有序节点
zk.create(
ZkUtils.path,
threadName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
this, "ABC");
// 等待 当前节点成功拿到锁 或者监听节点被删除
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void unLock() {
// 解锁 - 直接删除当前节点
try {
zk.delete("/" + nodeName, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* create 异步方法的回调
* @param rc
* @param path
* @param ctx
* @param name
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
// create 回调了 如果创建成功 去拿父节点下的所有有序节点信息
if(name != null) {
// 去掉 /
nodeName = name.substring(1);
System.out.println(threadName + " nodeName: " + nodeName);
// 这里一定要注意不要对父节点设置 watcher 因为监听父节点每个节点的删除都会通知其他节点 增加不必要的通信负担
// 只需要监听当前节点前面那个节点
zk.getChildren("/", false, this, "ABC");
}
}
/**
* getChildren()异步方法回调
* @param rc
* @param path
* @param ctx
* @param children
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
// 拿到了父节点下所有的节点信息 先排个序
if(stat != null) {
children.sort(String::compareTo);
// 判断当前节点是否是第一个
if(nodeName.compareTo(children.get(0)) == 0) {
// 当前节点拿到锁成功
latch.countDown();
} else {
// 需要对前一个节点设置监听
String prevNodeName = "/" + children.get(children.indexOf(nodeName) - 1);
// 通过exists设置监听并且处理回调
zk.exists(prevNodeName, this, this, "ABC");
}
}
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat == null) {
System.out.println(threadName + " found prev node missing: " + path);
}
}
}
先看 tryLock 方法:
public void tryLock() {
// 创建临时有序节点
zk.create(
ZkUtils.path,
threadName.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
this, "ABC");
// 等待 当前节点成功拿到锁 或者监听节点被删除
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
当要加锁时,请求 zookeeper 创建临时有序节点并且设置了回调对象为 this,然后阻塞住当前线程等待获取锁成功。create() 方法使用了异步方式,当方法请求结束时会调用回调对象的 processResult()方法,如下:
/**
* create 异步方法的回调
* @param rc
* @param path
* @param ctx
* @param name
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
// create 回调了 如果创建成功 去拿父节点下的所有有序节点信息
if(name != null) {
// 去掉 /
nodeName = name.substring(1);
System.out.println(threadName + " nodeName: " + nodeName);
// 这里一定要注意不要对父节点设置 watcher 因为监听父节点每个节点的删除都会通知其他节点 增加不必要的通信负担
// 只需要监听当前节点前面那个节点
zk.getChildren("/", false, this, "ABC");
}
}
创建节点成功后 name 不为空,然后通过 getChildren() 去拿父节点下面的所有节点,同样也是异步方式,这里要注意第二个参数要设置false,否则就对父节点设置监听了,不但没用而且会增加通信负担,第三个参数是回调对象,方法执行完成后会调用下面这个方法:
/**
* getChildren()异步方法回调
* @param rc
* @param path
* @param ctx
* @param children
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
// 拿到了父节点下所有的节点信息 先排个序
if(stat != null) {
children.sort(String::compareTo);
// 判断当前节点是否是第一个
if(nodeName.compareTo(children.get(0)) == 0) {
// 当前节点拿到锁成功
latch.countDown();
} else {
// 需要对前一个节点设置监听
String prevNodeName = "/" + children.get(children.indexOf(nodeName) - 1);
// 通过exists设置监听并且处理回调
zk.exists(prevNodeName, this, this, "ABC");
}
}
}
这里就开始对父节点下面所有的节点进行排序(从小到大),然后就有两种情况
情况一:当前节点是第一个,那么当前节点就顺理成章的获取到锁了,直接结束阻塞并返回即可
情况二:当前节点前面还有节点,那么就通过 exists() 方法对前一个节点设置监听,等它被删除的时候结束阻塞
exists() 方法的第二参数是监听对象,当监听的节点状态发生变化会调用这个对象的 process()方法
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
// 前一个节点被删除了 当前节点直接获得锁
latch.countDown();
System.out.println(threadName + " found prev node deleted: " + event.getPath());
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
}
当监听的节点被删除,会获得 NodeDeleted 的事件类型,直接对latch执行 countDown(),打印一条信息。
exists() 第三个参数是回调对象,当exists()异步执行完成会调用回调对象的下面这个方法:
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat == null) {
System.out.println(threadName + " found prev node missing: " + path);
}
}
这里没有什么操作,实验过程中也没走到这个if,不过如果当前节点要监听前一个节点然后突然发现前面这个节点不存在,可能是前面一个节点正好把锁释放了,所以如果要处理的话,这里可以直接让当前节点获取锁。
到这里,tryLock() 方法就执行完成了,反正就是节点要么获取到锁要么等待前序节点的删除然后获取到锁,因此还需要一个 unlock() 方法来删除节点:
public void unLock() {
// 解锁 - 直接删除当前节点
try {
zk.delete("/" + nodeName, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
unlock 很简单,就直接删除当前节点即可。
当然,如果当前线程因为某些情况挂掉了,那么zookeeper会在session的超时事件结束后替它把临时节点删除,那么后序节点就可以获得锁了。
验证实验
10个线程分别拿到自己的节点然后按顺序执行。
总结
zookeeper本身提供的临时有序节点规避了其他分布式锁的问题,结合监听和回调机制可以方便的实现分布式锁。实验的重点其实就是 WatcherCallback 类,它实现了所有需要用的 Watcher 和 Callback 接口,进而把所有内部实现封装在一个类里,所有运行过程都通过回调方法以及监听方法串联起来,暴露给外部的 tryLock/unlock 方法即可以对使用方屏蔽内部细节。这种类似响应式编程的方法写起来虽然很烧脑但是很有趣。
版权归原作者 Deepdoo 所有, 如有侵权,请联系我们删除。