0


这可能是最详细的分布式锁设计方案了

本文涉及内容:

  • 分布式锁介绍;
  • 用数据表做分布式锁原理介绍 & 数据表设计;
  • 用redis做分布式锁原理介绍 & 代码实操;
  • 用redisson做分布式锁原理介绍 & 代码实操;
  • 用zookeeper做分布式锁原理介绍;
  • 用curator做分布式锁代码实操;
  • 实现分布式锁的各方案比较;
  • 完整项目的GitHub地址

一、是什么?

1、锁的应用场景:
在单体应用中,我们会使用ReentrantLock或Synchronized来应对并发场景。
比如最常见的卖票场景,假如总共有100张票,线程A和线程B同时操作,如下图:

JMM内存模型

这时有一个共享变量100,线程A和B将100拷贝到自己的工作内存中,当线程A抢到执行权的时候,此时A工作内存中的值是100,然后售票,进行自减操作,将自己工作内存中的值变成了99。当A还没来得及将99刷回到主内存的时候,线程B进来了,此时B拿到的主内存的值还是100,然后售票,进行自减,也是99。这就出现了同一张票出售了两次的情况。所以我们会加锁加volatile保证原子性保证可见性。

2、分布式锁是什么?
上面的场景中,我们可以通过ReentrantLock或者Synchronized搞定,因为你的项目只运行在一台服务器上,只有一个JVM,所有的共享变量都加载到同一个主内存中。而分布式应用中,一个项目部署在多台服务器上,最基本的架构如下图:

最简单的分布式架构

比如现在server1、server2和server3读取到数据库的票数都是100,在每一个server中,我们可以用JDK的锁来保证多个用户同时访问我这台server时不会出问题。但问题是,如果client1访问到的是server1,票数是100,然后购票,还没来得及将数据库票数改为99,client2也开始访问系统购票了,client2如果访问的是server1,自然不会出问题,如果访问的是server2,这时server2读取到数据库的票数还是100,那么就出问题了,又出现了同一张票卖了两次的情况。在分布式应用中,JDK的锁机制就无法满足需求了,所以就出现了分布式锁。

3、分布式锁应该满足的条件:

  • 四个一:同一个方法在同一时刻只能被一台机器的一个线程执行
  • 三个具备:具备可重入特性;具备锁失效机制,防止死锁;具备非阻塞锁特性,即没获取到锁返回获取锁失败,而不是一直等待
  • 两个高:高性能地获取与释放锁;高可用的获取与释放锁

4、分布式锁的实现方式:

  • 基于数据库:用数据库的排他锁实现
  • 基于redis:利用redis的set key value NX EX 30000;也可以用redis的第三方库比如Redisson
  • 基于zookeeper:利用zookeeper的临时顺序节点实现;也可以用zookeeper的第三方库比如Curator

二、基于数据库实现

1、建表:

  1. CREATE TABLE `tb_distributed_lock` (
  2. `dl_id` INT NOT NULL auto_increment COMMENT '主键,自增',
  3. `dl_method_name` VARCHAR (64) NOT NULL DEFAULT '' COMMENT '方法名',
  4. `dl_device_info` VARCHAR (100) NOT NULL DEFAULT '' COMMENT 'ip+线程id',
  5. `dl_operate_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据被操作的时间',
  6. PRIMARY KEY (`dl_id`),
  7. UNIQUE KEY `uq_method_name` (`dl_method_name`) USING BTREE
  8. ) ENGINE = INNODB DEFAULT charset = utf8 COMMENT = '分布式锁表';

2、思路:
当执行一个方法的时候,我们首先尝试往表中插入一条数据。如果插入成功,则占锁成功,继续往下执行,执行完删除该记录。如果插入失败,我们再以

  1. 当前方法名、当前机器ip+线程id、数据被操作时间为5分钟内(5分钟表示锁失效的时间)

为条件去查询,如果有记录,表示该机器的该线程在5分钟内占有过锁了,直接往下执行最后删除记录;如果没有记录,占有锁失败。
一个用户就是一个线程,所以我们可以把机器ip和用户id组合一起当成

  1. dl_device_info

3、占有锁和释放锁:

  • 占有锁:
  1. INSERT INTO tb_distributed_lock (
  2. dl_method_name,
  3. dl_device_info
  4. )
  5. VALUES
  6. ('方法名', 'ip&用户id');

如果insert失败,则:

  1. SELECT
  2. count(*)
  3. FROM
  4. tb_distributed_lock
  5. WHERE
  6. dl_method_name = '方法名'
  7. AND dl_device_info = 'ip&用户id'
  8. AND dl_operate_time < SYSDATE() - 5;
  • 释放锁:
  1. DELETE
  2. FROM
  3. tb_distributed_lock
  4. WHERE
  5. dl_method_name = '方法名'
  6. AND dl_device_info = 'ip&用户id';

4、小总结:
以上表结构可能并不是很好,只是提供了这么一个思路。下面说它的优缺点:

  • 优点:成本低,不需要引入其他的技术
  • 缺点:对数据库依赖性强,如果数据库挂了,那就凉凉了,所以数据库最好也是高可用的

三、基于redis实现

1、原理:
基于redis的

  1. set key value nx ex 30

,这条语句的意思就是如果key不存在就设置,并且过期时间为30s,如果key已经存在就会返回false。如果要以毫秒为单位,把

  1. ex

换成

  1. px

就好了。我们执行方法前,先将方法名当成key,执行这条语句,如果执行成功就是获取锁成功,执行失败就是获取锁失败。

2、代码实现:

  • RedisUtil的部分代码:
  1. /**
  2. * key不存在时就设置,返回true,key已存在就返回false
  3. * @param key
  4. * @param value
  5. * @param timeout
  6. * @return
  7. */
  8. public static boolean setIfAbsent(String key, String value, Long timeout) {
  9. return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
  10. }
  11. /**
  12. * 获取key-value
  13. * @param key
  14. * @return
  15. */
  16. public static String getString(String key) {
  17. return (String) redisTemplate.opsForValue().get(key);
  18. }
  19. /**
  20. * 删除key
  21. * @param key
  22. * @return
  23. */
  24. public static boolean delKey(String key) {
  25. return redisTemplate.delete(key);
  26. }
  • 业务方法中使用:
  1. public String hello() {
  2. // 方法名当作key
  3. String key = "hello";
  4. String value = "hellolock";
  5. if (RedisUtil.setIfAbsent(key, value, 60 * 2L)) {
  6. System.out.println("成功获取到锁,开始执行业务逻辑……");
  7. // 假如执行业务逻辑需要1分钟
  8. try {TimeUnit.MINUTES.sleep(1L); } catch (Exception e) { e.printStackTrace();};
  9. // 释放锁先校验value,避免释放错
  10. if (value.equals(RedisUtil.getString(key))) {
  11. RedisUtil.delKey(key);
  12. System.out.println("执行完业务逻辑,释放锁成功");
  13. }
  14. return "success";
  15. } else {
  16. System.out.println("锁被别的线程占有,获取锁失败");
  17. return "acquire lock failed";
  18. }
  19. }

3、小总结:

  • 优点:简单易用,一条redis命令就搞定。可以设置过期时间,避免释放锁失败造成其他线程长时间无法获取锁的问题。
  • 缺点:这种做法只适合redis是单机的时候,如果redis有集群,这样做就会出问题。假如一个线程在master上获取锁成功了,在master还没来得及将数据同步到slave上的时候,master挂了,slave升级为master。第二个线程进来尝试获取锁,因为新的master上并没有这个key,所以,也能成功获取到锁。
  • 解决办法:针对上面的缺点,我们可以采用redis的RedLock算法。假如集群中有n个redis,我们先从这n个redis中尝试获取锁(锁的过期时间为x),并记录获取锁的消耗的总时间t,获取锁成功数量为s,当且仅当t < x 并且 s >= (n/2 + 1)时,认为获取锁成功。

四、基于Redisson实现

1、是什么?
官网地址:https://github.com/redisson/redisson/wiki/Table-of-Content
Redisson是一个功能十分强大的redis客户端,封装了很多分布式操作,比如分布式对象、分布式集合、分布式锁等。它的分布式锁也很多,什么公平锁、可重入锁、redlock等一应俱全,下面来看看如何在springboot项目中使用它。

2、使用redisson做分布式锁:

  • 添加依赖:
  1. <!-- redisson-springboot-starter -->
  2. <dependency>
  3. <groupId>org.redisson</groupId>
  4. <artifactId>redisson-spring-boot-starter</artifactId>
  5. <version>3.12.3</version>
  6. </dependency>
  7. <!-- io.netty/netty-all -->
  8. <dependency>
  9. <groupId>io.netty</groupId>
  10. <artifactId>netty-all</artifactId>
  11. </dependency>
  • application.yml:
  1. spring:
  2. application:
  3. name: distributed-lock
  4. redis:
  5. # redis单机版的写法
  6. host: 192.168.2.43
  7. port: 6379
  8. # 集群的写法
  9. #cluster:
  10. #nodes:
  11. #- 192.168.0.106,192.168.0.107
  12. #哨兵的写法
  13. #sentinel:
  14. #master: 192.168.0.106
  15. #nodes:
  16. #- 192.168.0.107,192.168.0.108
  • 用法:直接注入RedissonClient,然后用它获取锁,得到锁之后就可以进行占锁和释放锁了。有阻塞式锁,也有非阻塞式锁,具体用法如下:
  1. @Autowired
  2. private RedissonClient redisson;
  3. /**
  4. * 未设置过期时间,没获取到就会一直阻塞着
  5. * @return
  6. */
  7. @GetMapping("/testLock")
  8. public String testLock() {
  9. log.info("进入testLock方法,开始获取锁");
  10. String key = "testLock";
  11. RLock lock = redisson.getLock(key);
  12. lock.lock();
  13. log.info("获取锁成功,开始执行业务逻辑……");
  14. try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
  15. log.info("执行完业务逻辑,释放锁");
  16. lock.unlock();
  17. return "success";
  18. }
  19. /**
  20. * 尝试获取锁,没获取到就直接失败,不会阻塞
  21. * @return
  22. */
  23. @GetMapping("/testTryLock")
  24. public String testTryLock() {
  25. log.info("进入testTryLock方法,开始获取锁");
  26. String key = "testTryLock";
  27. RLock lock = redisson.getLock(key);
  28. boolean res = lock.tryLock();
  29. if (!res) {
  30. log.error("尝试获取锁失败");
  31. return "fail";
  32. } else {
  33. log.info("获取锁成功,开始执行业务逻辑……");
  34. try {TimeUnit.SECONDS.sleep(30L); } catch (Exception e) { e.printStackTrace();};
  35. log.info("执行完业务逻辑,释放锁");
  36. lock.unlock();
  37. return "success";
  38. }
  39. }
  40. /**
  41. * 锁设置了过期时间,即使最后面的unlock失败,20秒后也会自动释放锁
  42. * @return
  43. */
  44. @GetMapping("/testLockTimeout")
  45. public String testLockTimeout() {
  46. log.info("进入testLockTimeout方法,开始获取锁");
  47. String key = "testLockTimeout";
  48. RLock lock = redisson.getLock(key);
  49. // 20秒后自动释放锁
  50. lock.lock(20, TimeUnit.SECONDS);
  51. log.info("获取锁成功,开始执行业务逻辑……");
  52. try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
  53. lock.unlock();
  54. return "success";
  55. }
  56. /**
  57. * 尝试获取锁,15秒还没获取到就获取锁失败;获取到了会持有20秒,20秒后自动释放锁
  58. * @return
  59. */
  60. @GetMapping("/testTryLockTimeout")
  61. public String testTryLockTimeout() {
  62. log.info("进入testTryLockTimeout方法,开始获取锁");
  63. String key = "testTryLockTimeout";
  64. RLock lock = redisson.getLock(key);
  65. boolean res = false;
  66. try {
  67. res = lock.tryLock(15, 20, TimeUnit.SECONDS);
  68. } catch (InterruptedException e1) {
  69. e1.printStackTrace();
  70. }
  71. if (!res) {
  72. log.error("尝试获取锁失败");
  73. return "fail";
  74. } else {
  75. log.info("获取锁成功,开始执行业务逻辑……");
  76. try {TimeUnit.SECONDS.sleep(10L); } catch (Exception e) { e.printStackTrace();};
  77. log.info("执行完业务逻辑,释放锁");
  78. lock.unlock();
  79. return "success";
  80. }
  81. }

3、小总结:
以上就是使用redisson做分布式锁的简单demo,用起来十分的方便。上面是与springboot项目集成,直接用它提供的springboot的starter就好了。用它来做分布式锁的更多用法请移步至官网:redisson分布式锁。

五、基于zookeeper实现

1、zookeeper知识点回顾:

zookeeper有四种类型的节点:

  • 持久节点:默认的节点类型,客户端与zookeeper断开连接后,节点依然存在
  • 持久顺序节点:首先是持久节点,顺序的意思是,zookeeper会根据节点创建的顺序编号
  • 临时节点:客户端与zookeeper断开连接后节点不复存在
  • 临时顺序节点:客户端与zookeeper断开连接后节点不复存在,zookeeper会根据节点创建的顺序编号

2、基于zookeeper实现分布式锁的原理:
我们正是利用了zookeeper的临时顺序节点来实现分布式锁。首先我们创建一个名为

  1. lock

(节点名称随意)的持久节点。线程1获取锁时,就在

  1. lock

下面创建一个名为

  1. lock1

的临时顺序节点,然后查找

  1. lock

下所有的节点,判断自己的

  1. lock1

是不是第一个,如果是,获取锁成功,继续执行业务逻辑,执行完后删除

  1. lock1

节点;如果不是第一个,获取锁失败,就watch排在自己前面一位的节点,当排在自己前一位的节点被干掉时,再检查自己是不是排第一了,如果是,获取锁成功。图解过程如下:

zookeeper分布式锁原理

线程1创建了一个lock1,发现lock1的第一个节点,占锁成功;在线程1还没释放锁的时候,线程2来了,创建了一个lock2,发现lock2不是第一个,便监控lock1,线程3此时进行就监控lock2。直到自己是第一个节点时才占锁成功。假如某个线程释放锁的时候zookeeper崩了也没关系,因为是临时节点,断开连接节点就没了,其他线程还是可以正常获取锁,这就是要用临时节点的原因。

说清楚了原理,用代码实现也就不难了,可以引入zookeeper的客户端

  1. zkClient

,自己写代码实现(偷个懒,自己就不写了,有兴趣的可以参考我zookeeper的文章,肯定可以自己写出来的)。不过有非常优秀的开源解决方案比如curator,下面就看看curator怎么用。

六、基于curator实现

1、springboot整合curator:

  • pom.xml:
  1. <!-- curator start-->
  2. <dependency>
  3. <groupId>org.apache.zookeeper</groupId>
  4. <artifactId>zookeeper</artifactId>
  5. <version>3.4.14</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.curator</groupId>
  9. <artifactId>curator-framework</artifactId>
  10. <version>4.2.0</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.curator</groupId>
  14. <artifactId>curator-recipes</artifactId>
  15. <version>4.2.0</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.seleniumhq.selenium</groupId>
  19. <artifactId>selenium-java</artifactId>
  20. </dependency>
  21. <!-- curator end-->
  • application.yml:注意,curator下面这些属性spring是没有集成的,也就是说写的时候不会有提示
  1. curator:
  2. retryCount: 5 # 连接失败的重试次数
  3. retryTimeInterval: 5000 # 每隔5秒重试一次
  4. url: 192.168.2.43:2181 # zookeeper连接地址
  5. sessionTimeout: 60000 # session超时时间1分钟
  6. connectionTimeout: 5000 # 连接超时时间5秒钟
  • 配置类:读取application.yml中的属性,创建CuratorFramework实例
  1. @Configuration
  2. public class CutatorConfig {
  3. @Value("${curator.retryCount}")
  4. private Integer retryCount;
  5. @Value("${curator.retryTimeInterval}")
  6. private Integer retryTimeInterval;
  7. @Value("${curator.url}")
  8. private String url;
  9. @Value("${curator.sessionTimeout}")
  10. private Integer sessionTimeout;
  11. @Value("${curator.connectionTimeout}")
  12. private Integer connectionTimeout;
  13. @Bean
  14. public CuratorFramework curatorFramework() {
  15. return CuratorFrameworkFactory.newClient(url, sessionTimeout, connectionTimeout,
  16. new RetryNTimes(retryCount, retryTimeInterval));
  17. }
  18. }
  • 测试类:测试整合curator框架是否成功
  1. @SpringBootTest(classes = {DistributedLockApplication.class})
  2. @RunWith(SpringRunner.class)
  3. public class DistributedLockApplicationTests {
  4. @Autowired
  5. private CuratorFramework curatorFramework;
  6. @Test
  7. public void contextLoads() {
  8. curatorFramework.start();
  9. try {
  10. curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/zhusl", "test".getBytes());
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }

在确保zookeeper成功启动了的情况下,执行这个单元测试,最后回到linux中,用zkCli.sh连接,查看是否成功创建节点。

2、使用Curator做分布式锁:
Curator封装了很多锁,比如可重入共享锁、不可重入共享锁、可重入读写锁、联锁等。具体可以参考官网:curator分布式锁的用法。

  • ZookeeperUtil.java:工具类,封装获取锁,释放锁等方法。这里主要简单地封装了上面说的四种锁,仅供参考。
  1. @Component
  2. @Slf4j
  3. public class ZookeeperUtil {
  4. private static CuratorFramework curatorFramework;
  5. private static InterProcessLock lock;
  6. /** 持久节点 */
  7. private final static String ROOT_PATH = "/lock/";
  8. /** 可重入共享锁 */
  9. private static InterProcessMutex interProcessMutex;
  10. /** 不可重入共享锁 */
  11. private static InterProcessSemaphoreMutex interProcessSemaphoreMutex;
  12. /** 可重入读写锁 */
  13. private static InterProcessReadWriteLock interProcessReadWriteLock;
  14. /** 多共享锁(将多把锁当成一把来用) */
  15. private static InterProcessMultiLock interProcessMultiLock;
  16. @Autowired
  17. private void setCuratorFramework(CuratorFramework curatorFramework) {
  18. ZookeeperUtil.curatorFramework = curatorFramework;
  19. ZookeeperUtil.curatorFramework.start();
  20. }
  21. /**
  22. * 获取可重入排他锁
  23. *
  24. * @param lockName
  25. * @return
  26. */
  27. public static boolean interProcessMutex(String lockName) {
  28. interProcessMutex = new InterProcessMutex(curatorFramework, ROOT_PATH + lockName);
  29. lock = interProcessMutex;
  30. return acquireLock(lockName, lock);
  31. }
  32. /**
  33. * 获取不可重入排他锁
  34. *
  35. * @param lockName
  36. * @return
  37. */
  38. public static boolean interProcessSemaphoreMutex(String lockName) {
  39. interProcessSemaphoreMutex = new InterProcessSemaphoreMutex(curatorFramework, ROOT_PATH + lockName);
  40. lock = interProcessSemaphoreMutex;
  41. return acquireLock(lockName, lock);
  42. }
  43. /**
  44. * 获取可重入读锁
  45. *
  46. * @param lockName
  47. * @return
  48. */
  49. public static boolean interProcessReadLock(String lockName) {
  50. interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_PATH + lockName);
  51. lock = interProcessReadWriteLock.readLock();
  52. return acquireLock(lockName, lock);
  53. }
  54. /**
  55. * 获取可重入写锁
  56. *
  57. * @param lockName
  58. * @return
  59. */
  60. public static boolean interProcessWriteLock(String lockName) {
  61. interProcessReadWriteLock = new InterProcessReadWriteLock(curatorFramework, ROOT_PATH + lockName);
  62. lock = interProcessReadWriteLock.writeLock();
  63. return acquireLock(lockName, lock);
  64. }
  65. /**
  66. * 获取联锁(多把锁当成一把来用)
  67. * @param lockNames
  68. * @return
  69. */
  70. public static boolean interProcessMultiLock(List<String> lockNames) {
  71. if (lockNames == null || lockNames.isEmpty()) {
  72. log.error("no lockNames found");
  73. return false;
  74. }
  75. interProcessMultiLock = new InterProcessMultiLock(curatorFramework, lockNames);
  76. try {
  77. if (!interProcessMultiLock.acquire(10, TimeUnit.SECONDS)) {
  78. log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail");
  79. return false;
  80. } else {
  81. log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success");
  82. return true;
  83. }
  84. } catch (Exception e) {
  85. log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
  86. return false;
  87. }
  88. }
  89. /**
  90. * 释放锁
  91. *
  92. * @param lockName
  93. */
  94. public static void releaseLock(String lockName) {
  95. try {
  96. if (lock != null && lock.isAcquiredInThisProcess()) {
  97. lock.release();
  98. curatorFramework.delete().inBackground().forPath(ROOT_PATH + lockName);
  99. log.info("Thread:" + Thread.currentThread().getId() + " release lock success");
  100. }
  101. } catch (Exception e) {
  102. log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
  103. }
  104. }
  105. /**
  106. * 释放联锁
  107. */
  108. public static void releaseMultiLock(List<String> lockNames) {
  109. try {
  110. if (lockNames == null || lockNames.isEmpty()) {
  111. log.error("no no lockNames found to release");
  112. return;
  113. }
  114. if (interProcessMultiLock != null && interProcessMultiLock.isAcquiredInThisProcess()) {
  115. interProcessMultiLock.release();
  116. for (String lockName : lockNames) {
  117. curatorFramework.delete().inBackground().forPath(ROOT_PATH + lockName);
  118. }
  119. log.info("Thread:" + Thread.currentThread().getId() + " release lock success");
  120. }
  121. } catch (Exception e) {
  122. log.info("Thread:" + Thread.currentThread().getId() + " release lock occured an exception = " + e);
  123. }
  124. }
  125. /**
  126. * 获取锁
  127. *
  128. * @param lockName
  129. * @param interProcessLock
  130. * @return
  131. */
  132. private static boolean acquireLock(String lockName, InterProcessLock interProcessLock) {
  133. int flag = 0;
  134. try {
  135. while (!interProcessLock.acquire(2, TimeUnit.SECONDS)) {
  136. flag++;
  137. if (flag > 1) {
  138. break;
  139. }
  140. }
  141. } catch (Exception e) {
  142. log.error("acquire lock occured an exception = " + e);
  143. return false;
  144. }
  145. if (flag > 1) {
  146. log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock fail");
  147. return false;
  148. } else {
  149. log.info("Thread:" + Thread.currentThread().getId() + " acquire distributed lock success");
  150. return true;
  151. }
  152. }
  153. }
  • ZookeeperLockController.java:写一个接口,用Curator加锁,然后用浏览器进行访问
  1. @RestController
  2. @RequestMapping("/zookeeper-lock")
  3. public class ZookeeperLockController {
  4. @GetMapping("/testLock")
  5. public String testLock() {
  6. // 获取锁
  7. boolean lockResult = ZookeeperUtil.interProcessMutex("testLock");
  8. if (lockResult) {
  9. try {
  10. // 模拟执行业务逻辑
  11. TimeUnit.MINUTES.sleep(1L);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. // 释放锁
  16. ZookeeperUtil.releaseLock("testLock");
  17. return "success";
  18. } else {
  19. return "fail";
  20. }
  21. }
  22. }

打开一个浏览器窗口访问,后台打印出获取锁成功的日志,在1分钟之内,开启另一个窗口再次访问,打印出获取锁失败的日志,说明分布式锁生效了。

七、实现分布式锁的各方案比较

  • 基于数据库实现最简单,不需要引入第三方应用。但是因为每次加锁和解锁都要进行IO操作,性能不是很好。
  • 基于redis实现比较均衡,性能很好,也不是很难,比较可靠。
  • 基于zookeeper实现难度较大,因为需要维护一个zookeeper集群,如果项目原本没有用到zookeeper,还是用redis比较好。

本文项目地址:分布式锁


本文转载自: https://blog.csdn.net/mi_man_chi_you/article/details/122216831
版权归原作者 贪挽澜月 所有, 如有侵权,请联系我们删除。

“这可能是最详细的分布式锁设计方案了”的评论:

还没有评论