0


如何用 Redis 实现一个分布式锁

场景模拟

一般电子商务网站都会遇到如团购、秒杀、特价之类的活动,而这样的活动有一个共同的特点就是访问量激增、上千甚至上万人抢购一个商品。然而,作为活动商品,库存肯定是很有限的,如何控制库存不让出现超买,以防止造成不必要的损失是众多电子商务网站程序员头疼的问题,这同时也是最基本的问题。

在秒杀系统设计中,超卖是一个经典、常见的问题,任何商品都会有数量上限,如何避免成功下订单买到商品的人数不超过商品数量的上限,这是每个抢购活动都要面临的难点。

针对大量的并发请求,我们可以通过 Redis 来抗,也就是说对于库存直接请求 Redis 缓存,不直接请求数据库,如在 Redis 中有 50 个库存,如下:

image-20220309110319351

但不管是缓存还是数据库,在不做任何处理的情况下,都会出现超买的问题,常见的处理方式就是在代码中通过JVM 加锁的方式,如下:

server1

  1. @RestControllerpublicclassSkillController{@Autowiredprivate RedisTemplate redisTemplate;// 秒杀接口@RequestMapping("/deduct_stock")public String deductStock(){// 加锁synchronized(this){int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存
  2. redisTemplate.opsForValue().set("stock", realStock +"");
  3. System.out.println("扣减成功,剩余库存:"+ realStock);}else{
  4. System.out.println("扣减失败,库存不足");}}return"8080";}}

当然,在单机情况下确实没有任何问题,但现在绝大多数系统都是分布式系统,就算是 ERP 系统也会部署 2 台机器防止单点故障,所以一般情况下一个请求如下:

image-20220309110832380

server2

server2 和 server1 代码基本相同,只是开启了 2 个 JVM 实例。

  1. @RestControllerpublicclassSkillController{@Autowiredprivate RedisTemplate redisTemplate;// 秒杀接口@RequestMapping("/deduct_stock")public String deductStock(){// 加锁synchronized(this){int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存
  2. redisTemplate.opsForValue().set("stock", realStock +"");
  3. System.out.println("扣减成功,剩余库存:"+ realStock);}else{
  4. System.out.println("扣减失败,库存不足");}}return"8090";}}

Nginx

一般来说,前端通过 nginx 请求转发并通过 upstream 实现负载均衡,其关键配置如下:

image-20220309111849631

Jmeter

我们这里通过 Jmeter 来进行并发压测,不会用的参考 Jmeter 使用,然后这里提供下载链接:Jmeter 下载 (提取码:2hyo)。

并发请求:1 s 内 200 个请求(模拟高并发),循环 5 次,一共 1000 个总请求。

image-20220309113854795

请求地址:秒杀的减库存接口。

image-20220309113953340

JVM 锁

了解了上面的配置,然后启动 2 个实例,端口分别为 8080,8090,如下:

image-20220309112256004

如过不知道如何启动 2 个实例的看下面:

image-20220309112521469

注意:要修改启动端口。

使用 JVM 锁也就是同步代码块的方式存在问题,如上面测试的结果如下:

image-20220309135924299

不仅 2 个服务同时存在相同的库存,甚至同一个服务也存在相同的值,很明显在高并发分布式场景下,JVM 层面的锁是不可行的。

Redis SETNX

SETNX

格式:setnx key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

1、实现一个最简单的分布式锁

  1. @RestControllerpublicclassSkillController{@Autowiredprivate RedisTemplate redisTemplate;@RequestMapping("/deduct_stock")public String deductStock(){// 商品 ID,具体应用中应该是请求传入的
  2. String lockKey ="lock:product_01";// SETNX 加锁
  3. Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey,"product");// 如果为 false 说明这把锁存在,直接返回if(!result){// 模拟返回业务return"系统繁忙";}int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存 模拟其他更多业务操作
  4. redisTemplate.opsForValue().set("stock", realStock +"");
  5. System.out.println("扣减成功,剩余库存:"+ realStock);}else{
  6. System.out.println("扣减失败,库存不足");}// 加锁后需要释放锁
  7. redisTemplate.delete(lockKey);return"8080";}}

2、存在的问题

① 业务代码异常—死锁

在实际的场景中,一次秒杀过程涉及到很多的业务操作,如果在释放锁之前的某个业务操作抛异常,使得锁没有被释放,那么此时就会存在死锁问题。此时这个 key 永远存在于 redis 中,其它线程执行 SETNX 永远失败。

也就是说我们要保证释放锁得到执行,所以要把上面的业务代码放在

  1. try catch

或者

  1. try finally

中:

image-20220309151042583

② Redis 宕机

但其实上面的代码并不一定能完全解决问题,如果 Redis 宕机或者被重启,同样会导致 finally 中的代码执行失败,结果就同上了。

所以一般我们需要给这个 key 设置过期时间,即:

  1. Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey,"product");
  2. redisTemplate.expire(lockKey,10, TimeUnit.SECONDS);

但上面的写法存在原子性问题,所以我们不能分开来写,得合成一条命令:

  1. Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey,"product",10, TimeUnit.SECONDS);

③ 高并发可能存在的问题

一般来说,并发量不大的情况下,上面的写法已经满足要求,但对于几千上万的并发,可能会导致接口的响应变慢,比如:

某个请求 A 执行完整个操作需要 15s ,而我们上面设置的超时时间为 10s ,所以此时请求 A 并没有执行完,但由于设置了过期时间把 key 给删掉了,然后这时再来一个请求 B,是可以加锁成功的,且在 8s 内执行完成,而 B 在执行过程中 A 同样也在执行,如果此时 A 可能先于 B 去执行 finally 中的代码把锁给删除了,但 A 删除的锁并不是它的,而是 B 加的锁,同理,当请求 C 加锁后又被请求 B 给释放了,也就是说,这把分布式锁直接无效了(尽管可能性很小),同样会出现超买问题。

这个问题的根本在于:自己加的锁被被别人释放了

因此我们可以确定 value 值唯一性,如 UUID,如下:

image-20220309155219774

但这种方式同样存在原子问题,也就是上图中 ② 处的代码,结果也会导致自己加的锁被被别人释放

Redisson

针对上面的问题,我们可以通过 Redisson 来解决,其使用非常简单,和 JDK 中的 Lock 使用类似,如下:

  1. @RestControllerpublicclassRedissonController{@Autowiredprivate Redisson redisson;@Autowiredprivate RedisTemplate redisTemplate;@RequestMapping("/deduct_stock1")public String deductStock(){// 商品 ID,具体应用中应该是请求传入的
  2. String lockKey ="lock:product_01";// 获取锁
  3. RLock lock = redisson.getLock(lockKey);// 加锁
  4. lock.lock();try{int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存
  5. redisTemplate.opsForValue().set("stock", realStock +"");
  6. System.out.println("扣减成功,剩余库存:"+ realStock);}else{
  7. System.out.println("扣减失败,库存不足");}}finally{// 释放锁
  8. lock.unlock();}return"8080";}}

再去测试就是正常的了:

image-20220310091303423

Redisson 其原理如下:

image-20220309165722140

Redisson 锁的加锁机制如上图所示,线程去获取锁,获取成功则执行保存数据到 redis 数据库。如果获取失败,则一直通过 while 循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,保存数据到 redis 数据库。

Redisson 提供的分布式锁是支持锁自动续期的(锁续命),也就是说,如果线程仍旧没有执行完,那么 Redisson 会自动给 redis 中的目标 key 延长超时时间,这在 Redisson 中称之为 Watch Dog(看门狗)机制。

那么 redisson 是怎么实现原子性的

当然是

  1. lua

。不管是加锁操作,还是看门狗机制都是通过

  1. lua

来保证其原子性。

其加锁调用链路如下:

  1. RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->tryLockInnerAsync()

关键代码就在

  1. tryLockInnerAsync()

中:

  1. <T> RFuture<T>tryLockInnerAsync(long leaseTime, TimeUnit unit,long threadId, RedisStrictCommand<T> command){
  2. internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// 如果锁不存在,则通过hset设置它的值,并设置过期时间"if (redis.call('exists', KEYS[1]) == 0) then "+"redis.call('hset', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+// 如果锁已存在,其是当前线程,则通过hincrby给数值递增1,即锁的重入"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+// 如果锁已存在,不是当前线程,则返回过期时间 ttl"return redis.call('pttl', KEYS[1]);",
  3. Collections.<Object>singletonList(getName()), internalLockLeaseTime,getLockName(threadId));}

那么对于锁自动续期如下:

  1. RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->scheduleExpirationRenewal()
  1. scheduleExpirationRenewal()

方法会开启一个子线程去执行自动延期的操作,当然也是执行

  1. lua

代码,如下,截取关键部分:

  1. // getName()就是当前锁的名字
  2. RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 判断这个锁 getName() 是否在redis中存在,如果存在就进行 pexpire 延期 默认30s"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return 1; "+"end; "+"return 0;",
  3. Collections.<Object>singletonList(getName()), internalLockLeaseTime,getLockName(threadId));

他会判断这个锁

  1. getName()

是否在 redis 中存在,如果存在就进行

  1. pexpire

延期,默认

  1. lockWatchdogTimeout=30s

,且是每间隔

  1. lockWatchdogTimeout/3=10s

时间,去执行延时操作。

源码:https://gitee.com/javatv/redis.git

参考:redisson 中的看门狗机制

标签: redis 分布式 java

本文转载自: https://blog.csdn.net/weixin_43477531/article/details/123394134
版权归原作者 Ayue、 所有, 如有侵权,请联系我们删除。

“如何用 Redis 实现一个分布式锁”的评论:

还没有评论