1.客户端API
1.1导入依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
1.2代码实现
public class zkClient {
//一定不要有空格
private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
private int sessionTimeOut = 2000;
private ZooKeeper zkClient;
/**
* 初始话zookeeper
* 参数1:连接地址
* 参数2:超时时间
* 参数3:监听器
*/
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
List<String> children = null;
try {
children = zkClient.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("========================");
for (String child : children) {
System.out.println(child);
}
System.out.println("========================");
}
});
}
/**
* 创建子节点
* 参数1:创建节点的路径
* 参数2:节点的数据(转化为字节)
* 参数3:节点的权限
* 参数4:节点的类型(临时/永久)
**/
@Test
public void create() throws InterruptedException, KeeperException {
String nodeCreate = zkClient.create("/class", "s1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
* 监控子节点变化
* 参数1:要监控的节点目录
* 参数2:监听器(true:使用初始化是的监听器)
*/
@Test
public void getChildren() throws InterruptedException, KeeperException {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
//延时
Thread.sleep(Long.MAX_VALUE);
}
/**
* 判断节点是否存在
* 参数1:判断的节点路径
* 参数:是否使用监听器
*/
@Test
public void isExist() throws InterruptedException, KeeperException {
Stat stat = zkClient.exists("/class", false);
System.out.println(stat == null ? "不存在" : "存在");
}
}
1.3写数据原理
1.写入请求直接发送给Leader
- 1.客户端发请求给Leader
- 2.leader执行请求并应答,然后把请求分发给下一个follower
- 3.follower会执行请求并应答。
- 4.当应答数超过半数,Leader就会回复客户端,完成了写请求
- 5.leader会继续发送写请求给剩下的follower
2.写入请求发送给Follower
- 1.客户端发请求给follower,follower没有写权限,立即把写请求发给leader
- 2.leader执行写请求并应答,然后把写请求分发给follower
- 3.follower会执行请求并应答。
- 4.当应答数超过半数,Leader回复follower,由follower回复客户端,完成了写请求
- 5.leader会继续发送写请求给剩下的follower
2.服务器动态上下线
2.1客户端
- 1.获取zookeeper连接
- 2.监听节点的变化
- 3.业务逻辑(睡眠)
public class DisClient {
private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
private int sessionTimeOut = 2000;
private ZooKeeper zooKeeper;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DisClient client = new DisClient();
//1.获取zk连接
client.getConnect();
//2.监听/servers下面的节点变化
client.getServerList();
//3.业务逻辑
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
/**
* 监听服务端(获取节点信息)
* @throws InterruptedException
* @throws KeeperException
*/
private void getServerList() throws InterruptedException, KeeperException {
List<String> children = zooKeeper.getChildren("/servers", true);
//服务器地址存放到集合中
ArrayList<String> list = new ArrayList<>();
for (String child : children) {
byte[] data = zooKeeper.getData("/servers/" + child, false, null);
list.add(new String(data));
}
System.out.println(list);
}
/**
* 初始话zookeeper
* @throws IOException
*/
private void getConnect() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
getServerList();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
});
}
}
2.2服务端
- 1.获取zookeeper连接
- 2.创建节点(服务端注册到zookeeper)
- 3.业务逻辑(睡眠)
/**
* 服务端注册zookeeper
*/
public class DisServer {
private String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
private int sessionTimeOut = 2000;
private ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
DisServer dIsServer = new DisServer();
//1.获取zk连接
dIsServer.getConnect();
//2.注册服务器到zk节点(创建节点)
dIsServer.register(args[0]);
//3.启动业务逻辑
dIsServer.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
/**
* 注册服务器(创建节点)
* @param hostname
* @throws InterruptedException
* @throws KeeperException
*/
private void register(String hostname) throws InterruptedException, KeeperException {
String create = zooKeeper.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "已经上线");
}
/**
* 初始化zookeeper
* @throws IOException
*/
private void getConnect() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
3.分布式锁
Zookeeper的分布式锁实现基于其znode(zk节点)功能。每个znode都可以有数据和子节点,并且每个znode都有一个版本号。Zookeeper的分布式锁利用了znode的版本号特性,同时使用watcher机制实现分布式锁的互斥
3.1执行流程
- 当一个客户端需要获取锁时,它会在Zookeeper上创建一个临时且有序的znode节点。
- 客户端通过获取Zookeeper上的znode列表,并判断自己创建的节点是否是所有节点中最小的那个,如果是,则表示客户端获得了锁。
- 如果客户端没有获得锁,则监听它前面(比它序号小的)的节点,等待锁的释放。
- 当客户端释放锁时,它会删除自己创建的znode节点,此时,Zookeeper会通知正在等待前面的节点上的watcher机制,让等待锁的客户端尝试重新获取锁
3.2代码实现
/**
* 分布式锁
*/
public class ZkLock {
private final String connectString = "192.168.20.129:2181,192.168.20.130:2181,192.168.20.131:2181";
private final int sessionTimeOut = 2000;
private final ZooKeeper zooKeeper;
private String path;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private CountDownLatch countPathLatch = new CountDownLatch(1);
private String currentNode;
//构造器初始化
public ZkLock() throws IOException, InterruptedException, KeeperException {
//1.获取链接
zooKeeper = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//countDownLatch 连接上zookeeper,释放
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
//countPathLatch 释放
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(path)) {
countPathLatch.countDown();
}
}
});
//等待zookeeper正常连接后,往下执行程序
countDownLatch.await();
//2.判断根节点locks是否存在
Stat stat = zooKeeper.exists("/locks", false);
if (stat == null) {
//说明不存在,创建根节点
zooKeeper.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
//3.加锁
public void zkLock() {
//创建对应的临时带序号的节点
try {
currentNode = zooKeeper.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//判断创建的节点是否是最小的序号节点,如果是,获取到锁,如果不是,监听他前一个结点
List<String> children = zooKeeper.getChildren("/locks", false);
//如果只有一个值,直接获取锁,如果不是,则判断
if (children.size() == 1) {
//直接枷锁
return;
} else {
//对节点进行排序
Collections.sort(children);
//获取节点名称
String thisNode = currentNode.substring("/locks/".length());
//通过界节点名称,获取在集合中的下标
int index = children.indexOf(thisNode);
//判断下标
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {//第一个数据
//直接枷锁
return;
} else {//说明多个节点,进行监听前一个节点
path = "/locks/" + children.get(index - 1);
zooKeeper.getData(path, true, new Stat());
//等待监听
countPathLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//4.解锁
public void unZkLock() {
//删除节点
try {
zooKeeper.delete(currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
3.3线程测试
public class ZkLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZkLock zkLock1 = new ZkLock();
ZkLock zkLock2 = new ZkLock();
ZkLock zkLock3 = new ZkLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
zkLock1.zkLock();
System.out.println("线程1,获取到锁");
Thread.sleep(3 * 1000);
zkLock1.unZkLock();
System.out.println("线程1,释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
zkLock2.zkLock();
System.out.println("线程2,获取到锁");
Thread.sleep(3 * 1000);
zkLock2.unZkLock();
System.out.println("线程2,释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
zkLock3.zkLock();
System.out.println("线程3,获取到锁");
Thread.sleep(3 * 1000);
zkLock3.unZkLock();
System.out.println("线程3,释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
4.Curator框架
Curator是Apache ZooKeeper的一个高级客户端库,旨在使开发人员更容易编写可靠的分布式系统。它为ZooKeeper提供了许多有用的功能,包括连接管理,分布式锁和选举,缓存和观察。Curator还提供了一组易于使用的API,可以轻松管理ZooKeeper的节点和数据。
4.1添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.5.0</version>
</dependency>
4.2代码实现
/**
* 客户端连接
* @return
*/
private static CuratorFramework getCuratorFramework() {
//创建zookeeper的客户端:重试策略,初始化每次重试之间需要等待的时间,基准等待时间为3秒
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.20.129:2181,192.168.20.131:2181,192.168.20.130:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy).build();
client.start();
System.out.println("zookeeper启动~");
return client;
}
4.3创建线程测试
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("线程一获取到锁");
Thread.sleep(3000);
lock1.release();
System.out.println("线程一释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("线程二获取到锁");
Thread.sleep(3000);
lock2.release();
System.out.println("线程二释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
版权归原作者 会敲代码的小张 所有, 如有侵权,请联系我们删除。