分布式锁
分布式锁是控制分布式系统间同步访问共享资源的一种方式,其可以保证共享资源在并
发场景下的数据一致性。
当有多个线程要访问某一个共享资源(DBMS 中的数据或 Redis 中的数据,或共享文件
等)时,为了达到协调多个线程的同步访问,此时就需要使用分布式锁了。
为了达到同步访问的目的,规定,让这些线程在访问共享资源之前先要获取到一个令牌
token,只有具有令牌的线程才可以访问共享资源。这个令牌就是通过各种技术实现的分布
式锁。而这个分布锁是一种“互斥资源”,即只有一个。只要有线程抢到了锁,那么其它线
程只能等待,直到锁被释放或等待超时。
在对某一资源操作之前,程序先在Redis中拿到锁:
- setnx 命令,在finally里面释放锁。
- 为了防止执行完“添加锁”语句后突然宕机,其 finally 中的释放锁代码不执行,为锁添加过期时间。
- 为了防止本线程的业务还没处理完锁就过期了,导致另一个线程B拿到锁,结果本线程继续执行误把线程B设置的锁给删了,所以要为锁添加标识,生成UUID作为锁的 value
- 在finally语句块里面要判断是不是自己的锁,取得锁+判断+删除是非原子性的,在并发场景下可能会出问题。例如,客户端 a 在节点主机 A 中添加了锁后,执行业务逻辑用时 6 秒,此时锁已过期,然后执行到了 finally{}中的判断,并判断结果为真,然后时间片到了,暂停执行。由于节点主机 A 中的锁已经过期,客户端 b 在节点主机 B 中添加锁成功,然后很快执 行到了业务逻辑(未超过锁的过期时间),此时客户端 b 的处理进程时间片到了。 此时主机 A 中的代码又获得了处理机,继续执行。此时就会执行对锁的删除语句,删除 成功。也就是说主机 A 删除了由主机 B 添加的锁。这就是很严重的问题。解决方法是加Lua脚本
对客户端身份的判断与删除锁操作的合并,是没有专门的原子性命令的。此时可以通过
Lua 脚本来实现它们的原子性。而对 Lua 脚本的执行,可以通过 eval 命令来完成。
不过,eval 命令在 RedisTemplate 中没有对应的方法,而 Jedis 中具有该同名方法。所以,
需要在代码中首先获取到 Jedis 客户端,然后才能调用 jedis.eval()。
@GetMapping("/sk5")
public String seckillHandler5() {
// 为每一个访问的客户端随机生成一个客户端唯一标识
String clientId = UUID.randomUUID().toString();
try {
// 在添加锁的同时为锁指定过期时间,该操作具有原子性
// 将锁的value设置为clientId
Boolean lockOK = srt.opsForValue().setIfAbsent(REDIS_LOCK, clientId, 5, TimeUnit.SECONDS);
if (!lockOK) {
return "没有抢到锁";
}
// 添加锁成功
// 从Redis中获取库存
String stock = srt.opsForValue().get("sk:0008");
int amount = stock == null ? 0 : Integer.parseInt(stock);
if (amount > 0) {
// 修改库存后再写回Redis
srt.opsForValue().set("sk:0008", String.valueOf(--amount));
return "库丰剩余" + amount + "台";
}
} finally {
// 锁续约,或锁续命
JedisPool jedisPool = new JedisPool(redisHost, redisPort);
try (Jedis jedis = jedisPool.getResource()) {
// 定义Lua脚本。注意,每行最后要有一个空格
// redis.call()是Lua中对Redis命令的调用函数
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then return redis.call('del', KEYS[1]) " +
"end " +
"return 0";
// eval()方法的返回值为脚本script的返回值
Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(clientId));
if ("1".equals(eval.toString())) {
System.out.println("释放锁成功");
} else {
System.out.println("释放锁时发生异常");
}
}
}
return "抱歉,您没有抢到";
}
以上代码仍然是存在问题的:请求 a 的锁过期,但其业务还未执行完毕;请求 b 申请到
了锁,其也正在处理业务。如果此时两个请求都同时修改了共享的库存数据,那么就又会出
现数据不一致的问题,即仍然存在并发问题。在高并发场景下,问题会被无限放大。
对于该问题,可以采用“锁续约”方式解决。即,在当前业务进程开始执行时,fork 出
一个子进程,用于启动一个定时任务。该定时任务的定时时间小于锁的过期时间,其会定时
查看处理当前请求的业务进程的锁是否已被删除。如果已被删除,则子进程结束;如果未被
删除,说明当前请求的业务还未处理完毕,则将锁的时间重新设置为“原过期时间”。这种
方式称为锁续约,也称为锁续命。
**Redisson **可重入锁 - 单机Redis下分布式锁
使用 Redisson 的可重入锁可以解决上述问题。
Redisson 内部使用 Lua 脚本实现了对可重入锁的添加、重入、续约(续命)、释放。Redisson
需要用户为锁指定一个 key,但无需为锁指定过期时间,因为它有默认过期时间(当然,也可
指定)。由于该锁具有“可重入”功能,所以 Redisson 会为该锁生成一个计数器,记录一个
线程重入锁的次数。 hash -> field
**导入 ****Redisson **依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>
Spring中添加一个由单 Redis 节点构建的 Redisson 的 Bean。
@Bean
public Redisson redisson() {
Config Config = new Config();
Config.useSingleServer()
.setAddress(redisHost + ":" + redisPort)
.setDatabase(0);
return (Redisson) Redisson.create(Config);
}
在需要使用的类中注入
使用:
@GetMapping("/sk6")
public String seckillHandler6() {
RLock rLock = redisson.getLock(REDIS_LOCK);
try {
// 添加分布式锁
// Boolean lockOK = rLock.tryLock();
// 指定锁的过期时间为5秒
// Boolean lockOK = rLock.tryLock(5, TimeUnit.SECONDS);
// 指定锁的过期时间为5秒。如果申请锁失败,则最长等待20秒
Boolean lockOK = rLock.tryLock(20, 5, TimeUnit.SECONDS);
if (!lockOK) {
return "没有抢到锁";
}
// 添加锁成功
// 从Redis中获取库存
String stock = srt.opsForValue().get("sk:0008");
int amount = stock == null ? 0 : Integer.parseInt(stock);
if (amount > 0) {
// 修改库存后再写回Redis
srt.opsForValue().set("sk:0008", String.valueOf(--amount));
return "库丰剩余" + amount + "台";
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁
rLock.unlock();
}
return "抱歉,您没有抢到";
}
在 Redis 单机情况下,以上代码是没有问题的。但如果是在 Redis 主从集群中,那么其
还存在锁丢失问题。
在 Redis 主从集群中,假设节点 A 为 master,节点 B、C 为 slave。如果一个请求 a 在处
理时申请锁,即向节点 A 添加一个 key。当节点 A 收到请求后写入 key 成功,然后会立即向
处理 a 请求的应用服务器 Sa 响应,然后会向 slave 同步该 key。不过,在同步还未开始时,
节点 A 宕机,节点 B 晋升为 master。此时正好有一个请求 b 申请锁,由于节点 B 中并没有
该 key,所以该 key 写入成功,然后会立即向处理 b 请求的应用服务器 Sb 响应。由于 Sa 与
Sb 都收到了 key 写入成功的响应,所以它们都可同时对共享数据进行处理。这就又出现了
并发问题。
只所以新的 master 节点 B 同意请求 b 的锁申请,是因为主从集群丢失了请求 a 的锁申
请,即对于节点 B 来说,其根本就不知道有过请求 a 的锁申请。所以,该问题称为主从集群
的锁丢失问题。
**Redisson - **红锁 - 集群Redis下分布式锁
Redisson 红锁可以防止主从集群锁丢失问题。Redisson 红锁要求,必须要构建出至少三
个 Redis 主从集群。若一个请求要申请锁,必须向所有主从集群中提交 key 写入请求,只有
当大多数集群锁写入成功后,该锁才算申请成功。
容器中放入三个 Sentinel集群构建的 Redisson 的 Bean
@Bean("redisson-1")
public Redisson redisson1() {
Config Config = new Config();
Config.useSentinelServers()
.setMasterName("mymaster1")
.addSentinelAddress("redis:16380", "redis:16381", "redis:16382");
return (Redisson) Redisson.create(Config);
}
@Bean("redisson-2")
public Redisson redisson2() {
Config Config = new Config();
Config.useSentinelServers()
.setMasterName("mymaster2")
.addSentinelAddress("redis:26380", "redis:26381", "redis:26382");
return (Redisson) Redisson.create(Config);
}
@Bean("redisson-3")
public Redisson redisson3() {
Config Config = new Config();
Config.useSentinelServers()
.setMasterName("mymaster3")
.addSentinelAddress("redis:36380", "redis:36381", "redis:36382");
return (Redisson) Redisson.create(Config);
}
在需要使用的类注入使用
@GetMapping("/sk7")
public String seckillHandler7() {
// 定义三个可重入锁
RLock rLock1 = redisson1.getLock(REDIS_LOCK + "-1");
RLock rLock2 = redisson2.getLock(REDIS_LOCK + "-2");
RLock rLock3 = redisson3.getLock(REDIS_LOCK + "-3");
// 定义红锁
RLock rLock = new RedissonRedLock(rLock1, rLock2, rLock3);
try {
// 添加分布式锁
Boolean lockOK = rLock.tryLock();
if (!lockOK) {
return "没有抢到锁";
}
// 添加锁成功
// 从Redis中获取库存
String stock = srt.opsForValue().get("sk:0008");
int amount = stock == null ? 0 : Integer.parseInt(stock);
if (amount > 0) {
// 修改库存后再写回Redis
srt.opsForValue().set("sk:0008", String.valueOf(--amount));
return "库丰剩余" + amount + "台";
}
} finally {
// 释放锁
rLock.unlock();
}
return "抱歉,您没有抢到";
}
分段锁
无论前面使用的是哪种锁,它们解决并发问题的思路都是相同的,那就将所有请求通过
锁实现串行化。而串行化在高并发场景下势必会引发性能问题。
解决锁的串行化引发的性能问题的方案就是,使访问并行化。将要共享访问的一个资源,
拆分为多个共享访问资源,这样就会将一把锁的需求转变为多把锁,实现并行化。
例如,对于秒杀商品 sk:0008,其有 1000 件。现在将其拆分为 10 份,每份 100 件。即
将秒杀商品变为了 10 件,分别为 sk:0008:01,sk:0008:02,sk:0008:03,„„,sk:0008:10。
这样的话,就需要 10 把锁来控制所有请求的并发。由原来的因为只有一把锁而导致的每个
时刻只能处理 1 个请求,变为了现在有了 10 把锁,每个时刻可以同时处理 10 个请求。并发
提高了 10 倍。
**Redisson **详解
Redisson 底层采用的是 Netty 框架
Redisson 提供了使 用 Redis 的最简单和最便捷的方法。在生产中,对于 Redisson 使用最多的场景就是其分布式锁 RLock。
Redisson 的分布式锁 RLock
是一种可重入锁
是一种非公平锁,但也支持可重入公平锁 FailLock。
**联锁 **
Redisson 分布式锁可以实现联锁 MultiLock。当一个线程需要同时处理多个共享资源时,
可使用联锁。即一次性申请多个锁,同时锁定多个共享资源。联锁可预防死锁。相当于对共
享资源的申请实现了原子性:要么都申请到,只要缺少一个资源,则将申请到的所有资源全
部释放。其是 OS 底层原理中 AND 型信号量机制的典型应用。
**红锁 **
Redisson 分布式锁可以实现红锁 RedLock。红锁由多个锁构成,只有当这些锁中的大部
分锁申请成功时,红锁才申请成功。红锁一般用于解决 Redis 主从集群锁丢失问题。
红锁与联锁的区别是,红锁实现的是对一个共享资源的同步访问控制,而联锁实现的是
多个共享资源的同步访问控制。
**读写锁 **
通过 Redisson 可以获取到读写锁 RReadWriteLock。通过 RReadWriteLock 实例可分别获
取到读锁 RedissonReadLock 与写锁 RedissonWriteLock。读锁与写锁分别是实现了 RLock 的可
重入锁。
一个共享资源,在没有写锁的情况下,允许同时添加多个读锁。只要添加了写锁,任何
读锁与写锁都不能再次添加。即读锁是共享锁,写锁为排他锁。
**信号量 **
通过Redisson可以获取到信号量RSemaphore。RSemaphore的常用场景有两种:一种是,
无论谁添加的锁,任何其它线程都可以解锁,就可以使用 RSemaphore。另外,当一个线程
需要一次申请多个资源时,可使用 RSemaphore。RSemaphore 是信号量机制的典型应用。
@GetMapping("/sk8")
public String seckillHandler8() {
RSemaphore rs = redisson.getSemaphore("redis_semaphore");
try {
int buy = ThreadLocalRandom.current().nextInt(5) + 1;
Boolean lockOK = rs.tryAcquire(buy, 10, TimeUnit.SECONDS);
if (!lockOK) {
return "没有抢到锁";
}
// 添加锁成功
// 从Redis中获取库存
String stock = srt.opsForValue().get("sk:0008");
int amount = stock == null ? 0 : Integer.parseInt(stock);
if (amount > 0) {
// 修改库存后再写回Redis
srt.opsForValue().set("sk:0008", String.valueOf(--amount));
return "库丰剩余" + amount + "台";
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return "抱歉,您没有抢到";
}
**可过期信号量 **
通过 Redisson 可以获取到可过期信号量 PermitExpirableSemaphore。该信号量是在
RSemaphore 基础上,为每个信号增加了一个过期时间,且每个信号都可以通过独立的 ID 来
辨识。释放时也只能通过提交该 ID 才能释放。
不过,一个线程每次只能申请一个信号量,当然每次了只会释放一个信号量。这是与
RSemaphore 不同的地方。
该信号量为互斥信号量时,其就等同于可重入锁。或者说,可重入锁就相当于信号量为
1 的可过期信号量。
注意,可过期信号量与可重入锁的区别:
可重入锁:相当于用户每次只能申请 1 个信号量,且只有一个用户可以申请成功
可过期信号量:用户每次只能申请 1 个信号量,但可以有多个用户申请成功
@GetMapping("/test2")
public String test2() {
RPermitExpirableSemaphore rs = redisson.getPermitExpirableSemaphore("redis_semaphore");
String permitId = null;
try {
// 对信号量的申请(P操作)
// 申请1个信号,返回辨识ID
permitId = rs.acquire();
// 申请1个信号,若没有成功,则最多等待10秒,返回辨识ID
permitId = rs.tryAcquire(10, TimeUnit.SECONDS);
// 业务逻辑
// ……
} catch (Exception e) {
e.printStackTrace();
} finally {
// 对信号量的释放(V操作)
// 释放1个信号量,需要携带辨识ID
rs.release(permitId);
boolean releaseOK = rs.tryRelease(permitId);
}
return null;
}
**分布式闭锁 **
通过 Redisson 可以获取到分布式闭锁 RCountDownLatch,其与 JDK 的 JUC 中的闭锁
CountDownLatch 原理相同,用法类似。其常用于一个或者多个线程的执行必须在其它某些
任务执行完毕的场景。例如,大规模分布式并行计算中,最终的合并计算必须基于很多并行
计算的运行完毕。
闭锁中定义了一个计数器和一个阻塞队列。阻塞队列中存放着待执行的线程。每当一个
并行任务执行完毕,计数器就减 1。当计数器递减到 0 时就会唤醒阻塞队列的所有线程。
通常使用 Barrier 队列解决该问题,而 Barrier 队列通常使用 Zookeeper 实现。
@GetMapping("/test3")
public String test3() {
// 获取闭锁对象(合并线程与条件线程中都需要该代码)
RCountDownLatch latch = redisson.getCountDownLatch("countDownLatch");
// 设置闭锁计数器初值,使用该语句的场景:
// 1)Redis中没有设置该值
// 2)Redis中设置了该值,但已经变为了0,需要重置
latch.trySetCount(10);
// 在合并线程中要等待着闭锁的打开
try {
// 阻塞合并线程,直到锁打开
latch.await();
// 阻塞合并线程,直到锁打开或5秒后
latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 条件线程代码
// 使闭锁计数器减一
latch.countDown();
return null;
}
版权归原作者 SphereX 所有, 如有侵权,请联系我们删除。