0


redis分布式锁及秒杀系统实战

本文分为两部分:一、介绍redis分布式锁的原理和使用方法;二、使用redis分布式锁实现一个简单的秒杀系统。

注意:本文使用java语言,最后的例子为springboot项目。但是原理是一样的。

redis分布式锁

对并发有要求的系统常常面临一个问题,如何在实现并发的基础上保持数据的一致性。redis分布式锁能给出一个解决方案。

redis相信大家都非常熟悉了,作为一个数据库缓存技术,简便好用。redis当然也支持并发,核心就是使用redis分布式锁。

原理

其实redis分布式锁的原理非常简单:在运行实际的业务代码之前,首先到redis中去获得唯一的redis锁,如果获取到,则继续执行业务代码,并在业务代码结束后主动释放锁;否则等待,之后再重新获取锁,直到获取到为止。

核心代码如下:

  1. //通过向redis服务器插入一组键值对来获取锁
  2. Boolean lock = redisTemplate.opsForValue().setIfAbsent(“lock”, "123");
  3. if (lock) {
  4. //业务代码在此
  5. //......
  6. //通过删除redis中的lock键值对释放锁
  7. redisTemplate.delete("lock");
  8. }

通过以上代码可以看出,所谓的“获取锁”,“释放锁”操作,本质上就是向redis服务器插入和删除键值对。是不是也没那么高大上?

理解了原理,我们下面就动手做一做。以一个springboot项目为例,看看如何使用redis分布式锁。

使用IDE创建一个基本的springboot项目,我这里使用的IDEA2021.1社区版。

需要引入的依赖如下。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-data-redis</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.springframework.data</groupId>
  11. <artifactId>spring-data-redis</artifactId>
  12. <version>2.5.7</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.projectlombok</groupId>
  16. <artifactId>lombok</artifactId>
  17. </dependency>

在配置文件中对redis服务器进行配置。我这里使用了本地的redis服务器,各位可以根据情况修改。

创建一个controller包并在包下创建一个RedisLockController.class,并创建一个testLock方法:

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.data.redis.core.StringRedisTemplate;
  4. import org.springframework.util.StringUtils;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. @RestController
  8. @Slf4j
  9. public class RedisLockController {
  10. @Autowired
  11. StringRedisTemplate redisTemplate;
  12. @GetMapping("/testLock")
  13. public void testLock() {
  14. //尝试获取锁
  15. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
  16. if (lock) {
  17. log.info("成功获取锁");
  18. //以下是业务代码:到redis服务器中查找num,如果没找到则初始化,若找到则num+1
  19. String numStr = redisTemplate.opsForValue().get("num");
  20. if (StringUtils.isEmpty(numStr)) {
  21. redisTemplate.opsForValue().set("num", "0");
  22. } else {
  23. int number = Integer.valueOf(numStr);
  24. redisTemplate.opsForValue().set("num", ++number + "");
  25. }
  26. //释放锁
  27. redisTemplate.delete("lock");
  28. } {
  29. log.info("未获取到锁");
  30. }
  31. }
  32. }

运行项目,并发起请求:

之后查看redis服务器上的数据,发现num已初始化:

在以下位置打断点,并再次调用接口。

代码暂停至断点处,此时我们再次查看redis服务器,发现又多了一个键值对:

这就是获得的锁。让代码恢复运行,再次查看redis服务器,发现锁已经释放(其实就是把lock键值对删除了),并且业务代码正常运行,num+1:

了解了redis分布式锁的基本用法后,我们要进一步思考下以上代码是否有问题或者是否存在可以优化的地方?

进阶

锁过期时间

试想以下场景:代码正常获取到锁并开始运行业务代码,但是业务代码有bug,抛错了,会发生什么?我们加上以下代码模拟业务代码出错:

再次运行代码并调用接口,发现确实抛错:

查看redis服务器,发现lock并没有消失:

这是因为业务代码抛错,导致之后的释放锁代码没有执行,进而出现死锁的情况。之后我们再调用接口,会发现num不会再自增。

那么如何解决以上问题?给锁设置一个自动过期时间

  1. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123", 30, TimeUnit.SECONDS);

在时间到期后,无论有没有主动释放锁,锁都会自动过期。

我们将redis中的lock主动删除,再次运行代码并调用接口:

lock仍然正常产生了,但是等待30秒后,发现lock已经自动删除。

到这儿是不是就可以了呢?我们继续往下看

lock防误删

试想下面一种场景

1、有业务1和业务2,两个业务都会抢用redis锁(比如两个人同时抢购同一商品)

2、业务1的运行时间非常长b1,甚至长过了redis锁的自动失效时间timeout,即b1>timeout

3、那么在业务1运行timeout时间后,锁自动释放

4、在业务1结束之前,业务2也发起了,并成功得到了redis锁(b1已释放)

5、在业务2运行期间,业务1结束了,并主动释放锁,此时释放的会是业务2的锁!

那么这种情况怎么办?在尝试获取锁的时候,使用uuid生成一个唯一的字符串作为锁的value,不同的业务释放锁的时候都去检查释放的是否是自己的锁,代码如下。uuid保证了业务之间不可能生成value相同的字符串,业务可以据此判断释放的是否是自己的锁。

到这儿是不是就完美了呢?还没有

确保删锁原子性

我们仔细查看上面的代码,可能会出现这样一种情况:

1、当业务1已经使用uuid判断了锁是自己的,进入if语句块中

2、此时恰好业务1的锁因为到期自动释放了

3、业务2恰好获取到了锁

4、那么业务1之后释放的还是业务2的锁!

有人会说这样的概率太小了吧,单看是这样的,但如果考虑到像双11或618那样的海量请求,即便是小概率事件在基数极大时也有可能发生。

如何处理以上问题?引入LUA脚本,将if语句和删锁操作写在一个脚本语句中,进而确保原子性。将if语句块修改为以下形式:

  1. //使用LUA脚本执行原子操作,避免锁误删
  2. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
  3. DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
  4. redisScript.setScriptText(script);
  5. // 设置一下返回值类型 为Long
  6. // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
  7. // 那么返回字符串与0 会有发生错误。
  8. redisScript.setResultType(Long.class);
  9. // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
  10. redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);

好了,到此终于得到了一个比较完善的redis锁解决方案了,下面我们就将其应用到实战!

秒杀系统示例

一些基本参数如下。我们就设想10万人抢10个商品。redis锁自动释放时间为30秒。其中的userPatient指的是,如果用户没抢到,会再次发起抢购(抢过东西的都不陌生),之后再次抢购(啊,手不受控制的动了起来),这一过程循环往复持续的时间。

  1. private int totalNum = 10;//总商品数量
  2. private int lockTimeOut = 3;//锁过期时间30秒
  3. private int userPatient = 30000;//用户抢购的模拟时间(毫秒)
  4. private int userNum = 100000;//抢购人数

然后我们模拟创建一定数量的用户:

  1. List<String> initUsers() {
  2. List<String> result = new ArrayList<>();
  3. //这里简单的用数字代指用户
  4. for (int i = 1; i <= userNum; i++) {
  5. result.add(String.valueOf(i));
  6. }
  7. return result;
  8. }

将上一节介绍的都teskLock方法修改下:

  1. public String rob(String b) {
  2. //用户开抢时间
  3. long startTime = System.currentTimeMillis();
  4. //模拟用户持续抢
  5. while ((startTime + userPatient) >= System.currentTimeMillis()) {
  6. //首先查看总库存,如果为0,则返回null
  7. if (totalNum < 1) {
  8. return null;
  9. }
  10. String uuid = UUID.randomUUID().toString();
  11. Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, lockTimeOut, TimeUnit.SECONDS);//设置锁过期时间,防止死锁
  12. //获取锁成功
  13. if (lock) {
  14. //首先查看总库存,如果为0,则返回null
  15. if (totalNum < 1) {
  16. return null;
  17. }
  18. //模拟用户生成订单时间
  19. try {
  20. TimeUnit.SECONDS.sleep(1);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. log.info("用户 {} 抢购成功", b);
  25. totalNum--;
  26. //使用LUA脚本执行原子操作,避免锁误删
  27. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
  28. DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
  29. redisScript.setScriptText(script);
  30. // 设置一下返回值类型 为Long
  31. // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
  32. // 那么返回字符串与0 会有发生错误。
  33. redisScript.setResultType(Long.class);
  34. // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
  35. redisTemplate.execute(redisScript, Arrays.asList("lock"), uuid);
  36. return b;
  37. } else {
  38. log.error("获取锁失败!(没抢到)");
  39. }
  40. }
  41. return null;
  42. }

这里需要注意:

最后是模拟抢购的接口:注意这里使用了parallelStream方法模拟并发。

  1. @GetMapping("/monitor")
  2. public List<String> monitor() {
  3. //初始化抢购用户
  4. List<String> users = initUsers();
  5. //抢购成功用户结果表
  6. List<String> winners = new ArrayList<>();
  7. users.parallelStream().forEach(b -> {
  8. //用户尝试抢购
  9. String currentUser = rob(b);
  10. //如果抢购成功,则将用户放入结果表
  11. if (!StringUtils.isEmpty(currentUser)) {
  12. winners.add(currentUser);
  13. }
  14. });
  15. return winners;
  16. }

运行程序,并调用monitor接口,稍等片刻:

返回的10个数字就是抢购到商品的用户。

好的,到此有关redis分布式锁的介绍就到这里。源代码在此lisz112/redisLock

标签: redis 分布式

本文转载自: https://blog.csdn.net/a20100997/article/details/122730568
版权归原作者 李世吉吉 所有, 如有侵权,请联系我们删除。

“redis分布式锁及秒杀系统实战”的评论:

还没有评论