0


Zookeeper——分布式锁的概念理解 & 应用举例

1.前言

什么叫做分布式锁呢?

比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。


2.原生Zookeeper实现分布式锁

代码中的注释我已经写的很详细了。

这其中用到了JUC中的CountDownLatch,可以参考:https://blog.csdn.net/weixin_43823808/article/details/120799251

  1. package com.szh.case2;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import java.io.IOException;
  5. import java.util.Collections;
  6. import java.util.List;
  7. import java.util.Objects;
  8. import java.util.concurrent.CountDownLatch;
  9. /**
  10. *
  11. */
  12. public class DistributedZkLock {
  13. private final String connectString = "192.168.40.130:2181";
  14. private final int sessionTimeout = 30000;
  15. private final ZooKeeper zk;
  16. private CountDownLatch connectLatch = new CountDownLatch(1);
  17. private CountDownLatch waitLatch = new CountDownLatch(1);
  18. private String currentNode;
  19. private String waitPath;
  20. public DistributedZkLock() throws IOException, InterruptedException, KeeperException {
  21. //获取zk连接
  22. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  23. @Override
  24. public void process(WatchedEvent watchedEvent) {
  25. //如果连接上zk,则connectLatch可以释放
  26. if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
  27. connectLatch.countDown();
  28. }
  29. //如果监听的节点已被删除,同时当前监听的节点路径与即将被监听节点的前一个节点路径相同,则waitLatch可以释放
  30. if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
  31. waitLatch.countDown();
  32. }
  33. }
  34. });
  35. //等待zk连接成功之后,程序则继续往下走,其他线程进入等待连接的状态
  36. connectLatch.await();
  37. //判断根节点 /locks 是否存在
  38. Stat stat = zk.exists("/locks", false);
  39. //如果根节点 /locks 不存在,则立马创建
  40. if (Objects.isNull(stat)) {
  41. //此 /locks 节点默认所有人均可访问,而且是永久性节点
  42. zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  43. }
  44. }
  45. //加锁
  46. public void zkLock() {
  47. try {
  48. //所谓加锁,就是在根节点/locks下创建对应的临时、带序号的节点
  49. currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  50. //睡一会,让结果更清晰
  51. Thread.sleep(100);
  52. //判断创建的节点是否是序号最小的节点,如果是,则获取到锁;如果不是,则监听它序号的前一个节点
  53. List<String> children = zk.getChildren("/locks", false);
  54. if (children.size() == 1) { //只有一个节点,则直接获取锁
  55. return;
  56. } else { //如果有多个节点,则需要判断谁的序号最小
  57. //先对获取的节点的list集合排序,确保从小到大的顺序
  58. Collections.sort(children);
  59. //获取节点名称 seq-00000000
  60. String thisNode = currentNode.substring("/locks/".length());
  61. //通过节点名称获取到它在list集合中的位置
  62. int index = children.indexOf(thisNode);
  63. if (index == -1) { //没数据,无意义
  64. System.out.println("数据异常....");
  65. } else if (index == 0) { //说明此节点处于第一个位置,可以获取锁
  66. return;
  67. } else { //非第一个位置,需要监听前一个节点的变化
  68. //获取该节点序号的前一个节点
  69. waitPath = "/locks/" + children.get(index - 1);
  70. //监听,回调Watch的process方法
  71. zk.getData(waitPath, true, new Stat());
  72. //其他线程进入等待锁的状态
  73. waitLatch.await();
  74. return;
  75. }
  76. }
  77. } catch (KeeperException e) {
  78. e.printStackTrace();
  79. } catch (InterruptedException e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. //解锁
  84. public void zkUnLock() {
  85. //删除节点即解锁
  86. try {
  87. zk.delete(this.currentNode, -1);
  88. } catch (InterruptedException e) {
  89. e.printStackTrace();
  90. } catch (KeeperException e) {
  91. e.printStackTrace();
  92. }
  93. }
  94. }

下面是针对上面的一些方法的测试。

  1. package com.szh.case2;
  2. import org.apache.zookeeper.KeeperException;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. *
  7. */
  8. public class DistributedZkLockTest {
  9. public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
  10. //创建分布式锁1
  11. final DistributedZkLock lock1 = new DistributedZkLock();
  12. //创建分布式锁2
  13. final DistributedZkLock lock2 = new DistributedZkLock();
  14. //如下创建两个线程,模拟获取分布式锁的过程
  15. new Thread(new Runnable() {
  16. @Override
  17. public void run() {
  18. try {
  19. lock1.zkLock();
  20. System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁....");
  21. TimeUnit.MILLISECONDS.sleep(3000);
  22. lock1.zkUnLock();
  23. System.out.println(Thread.currentThread().getName() + "已释放锁....");
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }).start();
  29. new Thread(new Runnable() {
  30. @Override
  31. public void run() {
  32. try {
  33. lock2.zkLock();
  34. System.out.println(Thread.currentThread().getName() + " 已启动,获取到锁....");
  35. TimeUnit.MILLISECONDS.sleep(3000);
  36. lock2.zkUnLock();
  37. System.out.println(Thread.currentThread().getName() + "已释放锁....");
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }).start();
  43. }
  44. }

那么上面是我们自己手写的加锁、解锁的一些方法,其中也存在着很多问题。

**(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch **

**(2)Watch 需要重复注册,不然就不能生效 **

**(3)开发的复杂性还是比较高的 **

(4)不支持多节点删除和创建。需要自己去递归

所以就引出了下面的案例:👇👇👇


3.Curator框架实现分布式锁案例

**Curator ****是一个专门解决分布式锁的框架,解决了原生 ****JavaAPI ****开发分布式遇到的问题。 **

详情请查看官方文档:https://curator.apache.org/index.html

要使用它,就需要在pom文件中添加相关依赖。

  1. <dependency>
  2. <groupId>junit</groupId>
  3. <artifactId>junit</artifactId>
  4. <version>RELEASE</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.logging.log4j</groupId>
  8. <artifactId>log4j-core</artifactId>
  9. <version>2.8.2</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.zookeeper</groupId>
  13. <artifactId>zookeeper</artifactId>
  14. <version>3.5.7</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.curator</groupId>
  18. <artifactId>curator-framework</artifactId>
  19. <version>4.3.0</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.curator</groupId>
  23. <artifactId>curator-recipes</artifactId>
  24. <version>4.3.0</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.curator</groupId>
  28. <artifactId>curator-client</artifactId>
  29. <version>4.3.0</version>
  30. </dependency>
  1. package com.szh.case3;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import java.util.concurrent.TimeUnit;
  7. /**
  8. *
  9. */
  10. public class CuratorLockTest {
  11. public static void main(String[] args) {
  12. //创建分布式锁1
  13. InterProcessMutex lock1 = new InterProcessMutex(getZkClient(), "/locks");
  14. //创建分布式锁2
  15. InterProcessMutex lock2= new InterProcessMutex(getZkClient(), "/locks");
  16. //下面创建两个线程
  17. new Thread(() -> {
  18. try {
  19. lock1.acquire(); //获取锁
  20. System.out.println(Thread.currentThread().getName() + " 首次获取到锁....");
  21. lock1.acquire();
  22. System.out.println(Thread.currentThread().getName() + " 再次获取到锁....");
  23. TimeUnit.MILLISECONDS.sleep(5000);
  24. lock1.release(); //释放锁
  25. System.out.println(Thread.currentThread().getName() + " 首次释放锁....");
  26. lock1.release();
  27. System.out.println(Thread.currentThread().getName() + " 再次释放锁....");
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. }).start();
  32. new Thread(() -> {
  33. try {
  34. lock2.acquire();
  35. System.out.println(Thread.currentThread().getName() + " 首次获取到锁....");
  36. lock2.acquire();
  37. System.out.println(Thread.currentThread().getName() + " 再次获取到锁....");
  38. TimeUnit.MILLISECONDS.sleep(5000);
  39. lock2.release();
  40. System.out.println(Thread.currentThread().getName() + " 首次释放锁....");
  41. lock2.release();
  42. System.out.println(Thread.currentThread().getName() + " 再次释放锁....");
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }).start();
  47. }
  48. private static CuratorFramework getZkClient() {
  49. //客户端和服务器连接失败之后,多少秒之后再进行重试,以及重试的次数,5000ms之后重试,重试3次
  50. ExponentialBackoffRetry policy = new ExponentialBackoffRetry(5000, 3);
  51. CuratorFramework client = CuratorFrameworkFactory.builder()
  52. .connectString("192.168.40.130:2181")
  53. .connectionTimeoutMs(20000)
  54. .sessionTimeoutMs(30000)
  55. .retryPolicy(policy).build();
  56. //启动zk客户端
  57. client.start();
  58. System.out.println("zookeeper 启动成功....");
  59. return client;
  60. }
  61. }


4.Zookeeper常见面试题

  • **选举机制 **

** 半数机制,超过半数的投票通过,即通过。 **

** (1)第一次启动选举规则: 投票过半数时,服务器 id 大的胜出 **

** (2)第二次启动选举规则: ①EPOCH 大的直接胜出 **

** ②EPOCH 相同,事务 id 大的胜出 **

** ③事务 id 相同,服务器 id 大的胜出 **

  • **生产集群安装多少 zk 合适? **

** 安装奇数台。 **

** 生产经验: 10 台服务器:3 台 zk**

** 20 台服务器:5 台 zk**

** 100 台服务器:11 台 zk**

** 200 台服务器:11 台 zk **

** 服务器台数多:好处,提高可靠性;坏处:提高通信延时 **

  • 常用命令 :ls、get、create、delete

本文转载自: https://blog.csdn.net/weixin_43823808/article/details/124667997
版权归原作者 张起灵-小哥 所有, 如有侵权,请联系我们删除。

“Zookeeper&mdash;&mdash;分布式锁的概念理解 &amp; 应用举例”的评论:

还没有评论