一、为什么使用分布式锁?
>本地锁的局限性(synchronized):
本地锁只能锁住当前服务,只能保证自己的服务,只有一个线程可以访问,但是在服务众多的分布式环境下,其实是有多个线程同时访问的同一个数据,这显然是不符合要求的。
·>分布式锁的概念:
分布式锁指的是,所有服务中的所有线程都去获得同一把锁,但只有一个线程可以成功的获得锁,其他没有获得锁的线程必须全部等待,等到获得锁的线程释放掉锁之后获得了锁才能进行操作。Redis官网中,set key value有个带有NX参数的命令,这是一个原子性加锁的命令,指的是此key没有被lock是,当前线程才能加锁,如果已经被占用,就不能加锁。
redis实现分布式锁的原理?
1.抢占分布式锁:
Java代码中的实现:
Boolean lock = redisTemplate.opsForValue().setIfAbsent( "lock","111");
·如果加锁成功(lock = true)**,就先执行相应的业务,
然后释放掉锁:redisTemplate .delete(key: "lock" );
·如果加锁失败(lock = false)**,就通过自旋的方式进行重试(比如递归调用当前方法)。
注意:
为了防止在执行删锁操作之前,程序因为出现异常导致在还没有执行到删锁命令之前,程序就直接抛出异常退出,导致锁没有释放造成最终死锁的问题。(可能会有人想到,把删锁操作放在finally里以保证删锁操作一定被执行到,但是万一在执行删锁操作的过程中,电脑死机了呢!结果锁还是没有被成功的释放掉,依然会出现死锁现象。)于是,初步想到的解决方式就是在加锁的时候,就给这个锁设置一个过期时间。这样的话,即使我们由于各种原因没有成功的释放锁,redis也会根据过期时间,自动的帮助我们释放掉锁。
2.加锁的同时设置过期时间:
在成功获取到锁之后,执行删锁操作之前,给锁lock设置一个过期时间,例如30秒。
redisTemplate.expire( "lock" , 30, TimeUnit.SECONDS);
这样一来,即使我们自己没有删除掉锁,到到了过期时间后,redis也会帮我们自动删除掉。
注意:
由于加锁和设置锁的过期时间这两步操作不是原子性的,所以可能会在这之间出现问题,导致还没来得及设置锁的过期时间,程序就中断了。所以,需要加锁和设置过期时间这两步必须是原子性不可分割的操作。
Redis中的原子性命令,set lock 111 EX 30 NX ,表示key为lock,值为111,有效时间是30秒,是个NX的原子性加锁操作,可以保证加锁和过期时间这两个操作要么同时成功,要么同时失败。
Java中的代码是:
Boolean lock = redisTemp1ate.opsForValue().setIfAbsent("lock" , "111",30,TimeUnit.SECONDS);
二、模拟分布式锁的实现
(模拟抢票系统来实现分布式锁的实现)
2.1、创建数据库
2.2、导入对应的依赖文件
<!-- 数据库-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.26</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.8</version>
</dependency>
<!-- 引入redission-->
<dependency>
<groupId>com.github.hiwepy</groupId>
<artifactId>redisson-plus-spring-boot-starter</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
2.3、配置Yml文件信息
server:
port: 81
spring:
datasource:
url: jdbc:mysql:///db3
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
redis:
host: 192.168.247.130
port: 6379
jedis:
pool:
max-idle: 5
max-active: 10
max-wait: 5000
#模拟抢票线程
winnum: 1
#开启第二个进程服务
---
server:
port: 82
winnum: 2
spring:
profiles: win2
2.4、创建对应的pojo
@Data
public class Ticket {
private Integer id;
private Integer count;
private String identifier;
private String from1;
private String to1;
}
2.5、Mapper继承BaseMapper实现dao层的代码信息
public interface TicketMapper extends BaseMapper<Ticket> {
}
2.6、创建接口来测试分布式锁的实现
@RestController
@RequestMapping("/ticket")
public class TicketController {
@Autowired
private TicketMapper ticketMapper;
@Value("${winnum}")
private Integer winnum;
@Autowired
private StringRedisTemplate redisTemplate;
private Object lock = new Object();
private static final String LOCK_PREFIX = "ticket:lock:";
private static final String LOCK_VALUE_PREFIX = "TICKET:VALUE:";
@GetMapping("/sell/{id}")
public void sell(@PathVariable("id") Integer id) throws InterruptedException {
while (true){
//加一个分布式锁:(买不同票,不冲突,买相同票才会加锁)
//设置成功,加锁成功,设置失败,加锁失败(这个方法内部使用的是setnx指令)
//问题二:当业务没有执行完成,锁超时释放了--解决问题的方式,是给这个锁超时时间续时(开启一个守护线程,当程序中所有线程都是守护线程时,会自动退出)
//看门狗
Thread thread = new Thread(()->{
//续时
while (true) {
Long expire = redisTemplate.getExpire(LOCK_PREFIX + id);
//在续时时,要判断,当前业务是否由我负责
//获取当前获取锁的窗口,如果当前获取锁的窗口就是我们当前守护线程所在窗口,续时
String value = redisTemplate.opsForValue().get(LOCK_PREFIX + id);
if (expire != null) {
if (expire <= 2 && (LOCK_VALUE_PREFIX + winnum).equals(value)) {
redisTemplate.expire(LOCK_PREFIX + id, 3, TimeUnit.SECONDS);
}
}
}
});
//设置当前线程为守护线程
thread.setDaemon(true);
thread.start();
//问题一:添加过期时间,防止进程非正常退出,锁对象无法释放的问题 (setnx实现的分布式锁,是不可重入的 -- 实现可重入锁,需要使用hash结构)
//先获取key对应的值,判断值是否是当前进程拿到锁,是将这个value+1 -- lua脚本 (Redisson)
Boolean isLock = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + id, LOCK_VALUE_PREFIX+winnum,3, TimeUnit.SECONDS);
if (isLock) {
Ticket ticket = ticketMapper.selectById(id);
try {
//查询是否有票
if (ticket.getCount() > 0) {
//有票
System.out.println(winnum + "窗口正在卖出第" + ticket.getCount() + "张票");
//模拟卖票耗时
Thread.sleep(5000);
ticket.setCount(ticket.getCount() - 1);
//将新的票数设置到数据库
ticketMapper.updateById(ticket);
System.out.println(winnum + "窗口卖出票后,剩余票数为: " + ticket.getCount());
} else {
//没票
break;
}
}finally {
//模拟进程挂掉,让锁无法释放
if (winnum == 1 && ticket.getCount()<95){
int i = 1/0;
}
//释放锁
redisTemplate.delete(LOCK_PREFIX+id);
}
}
}
}
}
上面这个锁还有一个问题,不可重入的。如果我们要实现可重入锁,那么需要使用hash结构。redisson就是使用的hash结构实现可重入锁。但是原理和上面讲的一样。
三、衍生出创建分布式锁的整个流程
问题一:传统单进程,synchronized来实现加锁,当分布式进程如何实现加锁
答:采用redis在外部给程序进行上锁
redisTemplate.opsForValue().setIfAbsent();方法,返回布尔类型
如果已经存在值,返回flase,如果不存在,返回true
问题二:才锁redis的setifAbsent上锁之后,如何解锁
答:两种方式实现
方式一:
业务需要try finally 当业务完成时,在finally里面删除对应的key值
方式二:
设置锁的过期时间,来防止进程出错导致无法释放锁
问题三:业务时间大于key过期时间,如果处理
答:加上看门狗
当业务没有执行完成,锁超时释放了--解决问题的方式,是给这个锁超时时间续时(开启一个守护线程,当程序中所有线程都是**守护线程**时,会自动退出,推出后,就不会在给时间续时)
版权归原作者 西西o 所有, 如有侵权,请联系我们删除。