0


如何用 Redis 实现一个分布式锁

场景模拟

一般电子商务网站都会遇到如团购、秒杀、特价之类的活动,而这样的活动有一个共同的特点就是访问量激增、上千甚至上万人抢购一个商品。然而,作为活动商品,库存肯定是很有限的,如何控制库存不让出现超买,以防止造成不必要的损失是众多电子商务网站程序员头疼的问题,这同时也是最基本的问题。

在秒杀系统设计中,超卖是一个经典、常见的问题,任何商品都会有数量上限,如何避免成功下订单买到商品的人数不超过商品数量的上限,这是每个抢购活动都要面临的难点。

针对大量的并发请求,我们可以通过 Redis 来抗,也就是说对于库存直接请求 Redis 缓存,不直接请求数据库,如在 Redis 中有 50 个库存,如下:

image-20220309110319351

但不管是缓存还是数据库,在不做任何处理的情况下,都会出现超买的问题,常见的处理方式就是在代码中通过JVM 加锁的方式,如下:

server1

@RestControllerpublicclassSkillController{@Autowiredprivate RedisTemplate redisTemplate;// 秒杀接口@RequestMapping("/deduct_stock")public String deductStock(){// 加锁synchronized(this){int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存
                redisTemplate.opsForValue().set("stock", realStock +"");
                System.out.println("扣减成功,剩余库存:"+ realStock);}else{
                System.out.println("扣减失败,库存不足");}}return"8080";}}

当然,在单机情况下确实没有任何问题,但现在绝大多数系统都是分布式系统,就算是 ERP 系统也会部署 2 台机器防止单点故障,所以一般情况下一个请求如下:

image-20220309110832380

server2

server2 和 server1 代码基本相同,只是开启了 2 个 JVM 实例。

@RestControllerpublicclassSkillController{@Autowiredprivate RedisTemplate redisTemplate;// 秒杀接口@RequestMapping("/deduct_stock")public String deductStock(){// 加锁synchronized(this){int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存
                redisTemplate.opsForValue().set("stock", realStock +"");
                System.out.println("扣减成功,剩余库存:"+ realStock);}else{
                System.out.println("扣减失败,库存不足");}}return"8090";}}

Nginx

一般来说,前端通过 nginx 请求转发并通过 upstream 实现负载均衡,其关键配置如下:

image-20220309111849631

Jmeter

我们这里通过 Jmeter 来进行并发压测,不会用的参考 Jmeter 使用,然后这里提供下载链接:Jmeter 下载 (提取码:2hyo)。

并发请求:1 s 内 200 个请求(模拟高并发),循环 5 次,一共 1000 个总请求。

image-20220309113854795

请求地址:秒杀的减库存接口。

image-20220309113953340

JVM 锁

了解了上面的配置,然后启动 2 个实例,端口分别为 8080,8090,如下:

image-20220309112256004

如过不知道如何启动 2 个实例的看下面:

image-20220309112521469

注意:要修改启动端口。

使用 JVM 锁也就是同步代码块的方式存在问题,如上面测试的结果如下:

image-20220309135924299

不仅 2 个服务同时存在相同的库存,甚至同一个服务也存在相同的值,很明显在高并发分布式场景下,JVM 层面的锁是不可行的。

Redis SETNX

SETNX

格式:setnx key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

1、实现一个最简单的分布式锁

@RestControllerpublicclassSkillController{@Autowiredprivate RedisTemplate redisTemplate;@RequestMapping("/deduct_stock")public String deductStock(){// 商品 ID,具体应用中应该是请求传入的
        String lockKey ="lock:product_01";// SETNX 加锁
        Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey,"product");// 如果为 false 说明这把锁存在,直接返回if(!result){// 模拟返回业务return"系统繁忙";}int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存 模拟其他更多业务操作
            redisTemplate.opsForValue().set("stock", realStock +"");
            System.out.println("扣减成功,剩余库存:"+ realStock);}else{
            System.out.println("扣减失败,库存不足");}// 加锁后需要释放锁
        redisTemplate.delete(lockKey);return"8080";}}

2、存在的问题

① 业务代码异常—死锁

在实际的场景中,一次秒杀过程涉及到很多的业务操作,如果在释放锁之前的某个业务操作抛异常,使得锁没有被释放,那么此时就会存在死锁问题。此时这个 key 永远存在于 redis 中,其它线程执行 SETNX 永远失败。

也就是说我们要保证释放锁得到执行,所以要把上面的业务代码放在

try catch

或者

try finally

中:

image-20220309151042583

② Redis 宕机

但其实上面的代码并不一定能完全解决问题,如果 Redis 宕机或者被重启,同样会导致 finally 中的代码执行失败,结果就同上了。

所以一般我们需要给这个 key 设置过期时间,即:

Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey,"product");

redisTemplate.expire(lockKey,10, TimeUnit.SECONDS);

但上面的写法存在原子性问题,所以我们不能分开来写,得合成一条命令:

Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey,"product",10, TimeUnit.SECONDS);

③ 高并发可能存在的问题

一般来说,并发量不大的情况下,上面的写法已经满足要求,但对于几千上万的并发,可能会导致接口的响应变慢,比如:

某个请求 A 执行完整个操作需要 15s ,而我们上面设置的超时时间为 10s ,所以此时请求 A 并没有执行完,但由于设置了过期时间把 key 给删掉了,然后这时再来一个请求 B,是可以加锁成功的,且在 8s 内执行完成,而 B 在执行过程中 A 同样也在执行,如果此时 A 可能先于 B 去执行 finally 中的代码把锁给删除了,但 A 删除的锁并不是它的,而是 B 加的锁,同理,当请求 C 加锁后又被请求 B 给释放了,也就是说,这把分布式锁直接无效了(尽管可能性很小),同样会出现超买问题。

这个问题的根本在于:自己加的锁被被别人释放了

因此我们可以确定 value 值唯一性,如 UUID,如下:

image-20220309155219774

但这种方式同样存在原子问题,也就是上图中 ② 处的代码,结果也会导致自己加的锁被被别人释放

Redisson

针对上面的问题,我们可以通过 Redisson 来解决,其使用非常简单,和 JDK 中的 Lock 使用类似,如下:

@RestControllerpublicclassRedissonController{@Autowiredprivate Redisson redisson;@Autowiredprivate RedisTemplate redisTemplate;@RequestMapping("/deduct_stock1")public String deductStock(){// 商品 ID,具体应用中应该是请求传入的
        String lockKey ="lock:product_01";// 获取锁
        RLock lock = redisson.getLock(lockKey);// 加锁
        lock.lock();try{int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());if(stock >0){// 库存 -1int realStock = stock -1;// 扣减库存
                redisTemplate.opsForValue().set("stock", realStock +"");
                System.out.println("扣减成功,剩余库存:"+ realStock);}else{
                System.out.println("扣减失败,库存不足");}}finally{// 释放锁
            lock.unlock();}return"8080";}}

再去测试就是正常的了:

image-20220310091303423

Redisson 其原理如下:

image-20220309165722140

Redisson 锁的加锁机制如上图所示,线程去获取锁,获取成功则执行保存数据到 redis 数据库。如果获取失败,则一直通过 while 循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,保存数据到 redis 数据库。

Redisson 提供的分布式锁是支持锁自动续期的(锁续命),也就是说,如果线程仍旧没有执行完,那么 Redisson 会自动给 redis 中的目标 key 延长超时时间,这在 Redisson 中称之为 Watch Dog(看门狗)机制。

那么 redisson 是怎么实现原子性的

当然是

lua

。不管是加锁操作,还是看门狗机制都是通过

lua

来保证其原子性。

其加锁调用链路如下:

RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->tryLockInnerAsync()

关键代码就在

tryLockInnerAsync()

中:

<T> RFuture<T>tryLockInnerAsync(long leaseTime, TimeUnit unit,long threadId, RedisStrictCommand<T> command){
    internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,// 如果锁不存在,则通过hset设置它的值,并设置过期时间"if (redis.call('exists', KEYS[1]) == 0) then "+"redis.call('hset', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+// 如果锁已存在,其是当前线程,则通过hincrby给数值递增1,即锁的重入"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+// 如果锁已存在,不是当前线程,则返回过期时间 ttl"return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime,getLockName(threadId));}

那么对于锁自动续期如下:

RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->scheduleExpirationRenewal()
scheduleExpirationRenewal()

方法会开启一个子线程去执行自动延期的操作,当然也是执行

lua

代码,如下,截取关键部分:

// getName()就是当前锁的名字 
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,// 判断这个锁 getName() 是否在redis中存在,如果存在就进行 pexpire 延期 默认30s"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return 1; "+"end; "+"return 0;",
          Collections.<Object>singletonList(getName()), internalLockLeaseTime,getLockName(threadId));

他会判断这个锁

getName()

是否在 redis 中存在,如果存在就进行

pexpire

延期,默认

lockWatchdogTimeout=30s

,且是每间隔

lockWatchdogTimeout/3=10s

时间,去执行延时操作。

源码:https://gitee.com/javatv/redis.git

参考:redisson 中的看门狗机制

标签: redis 分布式 java

本文转载自: https://blog.csdn.net/weixin_43477531/article/details/123394134
版权归原作者 Ayue、 所有, 如有侵权,请联系我们删除。

“如何用 Redis 实现一个分布式锁”的评论:

还没有评论