0


Redis - 分布式锁、Redisson

分布式锁

分布式锁是控制分布式系统间同步访问共享资源的一种方式,其可以保证共享资源在并

发场景下的数据一致性。

当有多个线程要访问某一个共享资源(DBMS 中的数据或 Redis 中的数据,或共享文件

等)时,为了达到协调多个线程的同步访问,此时就需要使用分布式锁了。

为了达到同步访问的目的,规定,让这些线程在访问共享资源之前先要获取到一个令牌

token,只有具有令牌的线程才可以访问共享资源。这个令牌就是通过各种技术实现的分布

式锁。而这个分布锁是一种“互斥资源”,即只有一个。只要有线程抢到了锁,那么其它线

程只能等待,直到锁被释放或等待超时。

在对某一资源操作之前,程序先在Redis中拿到锁:

  1. setnx 命令,在finally里面释放锁。
  2. 为了防止执行完“添加锁”语句后突然宕机,其 finally 中的释放锁代码不执行,为锁添加过期时间。
  3. 为了防止本线程的业务还没处理完锁就过期了,导致另一个线程B拿到锁,结果本线程继续执行误把线程B设置的锁给删了,所以要为锁添加标识,生成UUID作为锁的 value
  4. 在finally语句块里面要判断是不是自己的锁,取得锁+判断+删除是非原子性的,在并发场景下可能会出问题。例如,客户端 a 在节点主机 A 中添加了锁后,执行业务逻辑用时 6 秒,此时锁已过期,然后执行到了 finally{}中的判断,并判断结果为真,然后时间片到了,暂停执行。由于节点主机 A 中的锁已经过期,客户端 b 在节点主机 B 中添加锁成功,然后很快执 行到了业务逻辑(未超过锁的过期时间),此时客户端 b 的处理进程时间片到了。 此时主机 A 中的代码又获得了处理机,继续执行。此时就会执行对锁的删除语句,删除 成功。也就是说主机 A 删除了由主机 B 添加的锁。这就是很严重的问题。解决方法是加Lua脚本

对客户端身份的判断与删除锁操作的合并,是没有专门的原子性命令的。此时可以通过

Lua 脚本来实现它们的原子性。而对 Lua 脚本的执行,可以通过 eval 命令来完成。

不过,eval 命令在 RedisTemplate 中没有对应的方法,而 Jedis 中具有该同名方法。所以,

需要在代码中首先获取到 Jedis 客户端,然后才能调用 jedis.eval()。

@GetMapping("/sk5")
public String seckillHandler5() {
    // 为每一个访问的客户端随机生成一个客户端唯一标识
    String clientId = UUID.randomUUID().toString();
    try {
        // 在添加锁的同时为锁指定过期时间,该操作具有原子性
        // 将锁的value设置为clientId
        Boolean lockOK = srt.opsForValue().setIfAbsent(REDIS_LOCK, clientId, 5, TimeUnit.SECONDS);

        if (!lockOK) {
            return "没有抢到锁";
        }
        // 添加锁成功
        // 从Redis中获取库存
        String stock = srt.opsForValue().get("sk:0008");
        int amount = stock == null ? 0 : Integer.parseInt(stock);
        if (amount > 0) {
            // 修改库存后再写回Redis
            srt.opsForValue().set("sk:0008", String.valueOf(--amount));
            return "库丰剩余" + amount + "台";
        }
    } finally {
        // 锁续约,或锁续命
        JedisPool jedisPool = new JedisPool(redisHost, redisPort);
        try (Jedis jedis = jedisPool.getResource()) {
            // 定义Lua脚本。注意,每行最后要有一个空格
            // redis.call()是Lua中对Redis命令的调用函数
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                    "then return redis.call('del', KEYS[1]) " +
                    "end " +
                    "return 0";

            // eval()方法的返回值为脚本script的返回值
            Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(clientId));
            if ("1".equals(eval.toString())) {
                System.out.println("释放锁成功");
            } else {
                System.out.println("释放锁时发生异常");
            }
        }
    }

    return "抱歉,您没有抢到";
}

以上代码仍然是存在问题的:请求 a 的锁过期,但其业务还未执行完毕;请求 b 申请到

了锁,其也正在处理业务。如果此时两个请求都同时修改了共享的库存数据,那么就又会出

现数据不一致的问题,即仍然存在并发问题。在高并发场景下,问题会被无限放大。

对于该问题,可以采用“锁续约”方式解决。即,在当前业务进程开始执行时,fork 出

一个子进程,用于启动一个定时任务。该定时任务的定时时间小于锁的过期时间,其会定时

查看处理当前请求的业务进程的锁是否已被删除。如果已被删除,则子进程结束;如果未被

删除,说明当前请求的业务还未处理完毕,则将锁的时间重新设置为“原过期时间”。这种

方式称为锁续约,也称为锁续命。


**Redisson **可重入锁 - 单机Redis下分布式锁

使用 Redisson 的可重入锁可以解决上述问题。

Redisson 内部使用 Lua 脚本实现了对可重入锁的添加、重入、续约(续命)、释放。Redisson

需要用户为锁指定一个 key,但无需为锁指定过期时间,因为它有默认过期时间(当然,也可

指定)。由于该锁具有“可重入”功能,所以 Redisson 会为该锁生成一个计数器,记录一个

线程重入锁的次数。 hash -> field

**导入 ****Redisson **依赖

<dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson</artifactId>
 <version>3.17.6</version>
</dependency>

Spring中添加一个由单 Redis 节点构建的 Redisson 的 Bean。

@Bean
public Redisson redisson() {
    Config Config = new Config();
    Config.useSingleServer()
            .setAddress(redisHost + ":" + redisPort)
            .setDatabase(0);
    return (Redisson) Redisson.create(Config);
}

在需要使用的类中注入

使用:

@GetMapping("/sk6")
public String seckillHandler6() {
    RLock rLock = redisson.getLock(REDIS_LOCK);
    try {
        // 添加分布式锁
        // Boolean lockOK = rLock.tryLock();
        // 指定锁的过期时间为5秒
        // Boolean lockOK = rLock.tryLock(5, TimeUnit.SECONDS);
        // 指定锁的过期时间为5秒。如果申请锁失败,则最长等待20秒
        Boolean lockOK = rLock.tryLock(20, 5, TimeUnit.SECONDS);

        if (!lockOK) {
            return "没有抢到锁";
        }
        // 添加锁成功
        // 从Redis中获取库存
        String stock = srt.opsForValue().get("sk:0008");
        int amount = stock == null ? 0 : Integer.parseInt(stock);
        if (amount > 0) {
            // 修改库存后再写回Redis
            srt.opsForValue().set("sk:0008", String.valueOf(--amount));
            return "库丰剩余" + amount + "台";
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 释放锁
        rLock.unlock();
    }

    return "抱歉,您没有抢到";
}

在 Redis 单机情况下,以上代码是没有问题的。但如果是在 Redis 主从集群中,那么其

还存在锁丢失问题。

在 Redis 主从集群中,假设节点 A 为 master,节点 B、C 为 slave。如果一个请求 a 在处

理时申请锁,即向节点 A 添加一个 key。当节点 A 收到请求后写入 key 成功,然后会立即向

处理 a 请求的应用服务器 Sa 响应,然后会向 slave 同步该 key。不过,在同步还未开始时,

节点 A 宕机,节点 B 晋升为 master。此时正好有一个请求 b 申请锁,由于节点 B 中并没有

该 key,所以该 key 写入成功,然后会立即向处理 b 请求的应用服务器 Sb 响应。由于 Sa 与

Sb 都收到了 key 写入成功的响应,所以它们都可同时对共享数据进行处理。这就又出现了

并发问题。

只所以新的 master 节点 B 同意请求 b 的锁申请,是因为主从集群丢失了请求 a 的锁申

请,即对于节点 B 来说,其根本就不知道有过请求 a 的锁申请。所以,该问题称为主从集群

的锁丢失问题。


**Redisson - **红锁 - 集群Redis下分布式锁

Redisson 红锁可以防止主从集群锁丢失问题。Redisson 红锁要求,必须要构建出至少三

个 Redis 主从集群。若一个请求要申请锁,必须向所有主从集群中提交 key 写入请求,只有

当大多数集群锁写入成功后,该锁才算申请成功。

容器中放入三个 Sentinel集群构建的 Redisson 的 Bean

@Bean("redisson-1")
public Redisson redisson1() {
    Config Config = new Config();
    Config.useSentinelServers()
            .setMasterName("mymaster1")
            .addSentinelAddress("redis:16380", "redis:16381", "redis:16382");
    return (Redisson) Redisson.create(Config);
}

@Bean("redisson-2")
public Redisson redisson2() {
    Config Config = new Config();
    Config.useSentinelServers()
            .setMasterName("mymaster2")
            .addSentinelAddress("redis:26380", "redis:26381", "redis:26382");
    return (Redisson) Redisson.create(Config);
}

@Bean("redisson-3")
public Redisson redisson3() {
    Config Config = new Config();
    Config.useSentinelServers()
            .setMasterName("mymaster3")
            .addSentinelAddress("redis:36380", "redis:36381", "redis:36382");
    return (Redisson) Redisson.create(Config);
}

在需要使用的类注入使用

@GetMapping("/sk7")
public String seckillHandler7() {
    // 定义三个可重入锁
    RLock rLock1 = redisson1.getLock(REDIS_LOCK + "-1");
    RLock rLock2 = redisson2.getLock(REDIS_LOCK + "-2");
    RLock rLock3 = redisson3.getLock(REDIS_LOCK + "-3");

    // 定义红锁
    RLock rLock = new RedissonRedLock(rLock1, rLock2, rLock3);
    try {
        // 添加分布式锁
        Boolean lockOK = rLock.tryLock();

        if (!lockOK) {
            return "没有抢到锁";
        }
        // 添加锁成功
        // 从Redis中获取库存
        String stock = srt.opsForValue().get("sk:0008");
        int amount = stock == null ? 0 : Integer.parseInt(stock);
        if (amount > 0) {
            // 修改库存后再写回Redis
            srt.opsForValue().set("sk:0008", String.valueOf(--amount));
            return "库丰剩余" + amount + "台";
        }
    } finally {
        // 释放锁
        rLock.unlock();
    }

    return "抱歉,您没有抢到";
}

分段锁

无论前面使用的是哪种锁,它们解决并发问题的思路都是相同的,那就将所有请求通过

锁实现串行化。而串行化在高并发场景下势必会引发性能问题。

解决锁的串行化引发的性能问题的方案就是,使访问并行化。将要共享访问的一个资源,

拆分为多个共享访问资源,这样就会将一把锁的需求转变为多把锁,实现并行化。

例如,对于秒杀商品 sk:0008,其有 1000 件。现在将其拆分为 10 份,每份 100 件。即

将秒杀商品变为了 10 件,分别为 sk:0008:01,sk:0008:02,sk:0008:03,„„,sk:0008:10。

这样的话,就需要 10 把锁来控制所有请求的并发。由原来的因为只有一把锁而导致的每个

时刻只能处理 1 个请求,变为了现在有了 10 把锁,每个时刻可以同时处理 10 个请求。并发

提高了 10 倍。


**Redisson **详解

Redisson 底层采用的是 Netty 框架

Redisson 提供了使 用 Redis 的最简单和最便捷的方法。在生产中,对于 Redisson 使用最多的场景就是其分布式锁 RLock。

Redisson 的分布式锁 RLock

是一种可重入锁

是一种非公平锁,但也支持可重入公平锁 FailLock。

**联锁 **

Redisson 分布式锁可以实现联锁 MultiLock。当一个线程需要同时处理多个共享资源时,

可使用联锁。即一次性申请多个锁,同时锁定多个共享资源。联锁可预防死锁。相当于对共

享资源的申请实现了原子性:要么都申请到,只要缺少一个资源,则将申请到的所有资源全

部释放。其是 OS 底层原理中 AND 型信号量机制的典型应用。

**红锁 **

Redisson 分布式锁可以实现红锁 RedLock。红锁由多个锁构成,只有当这些锁中的大部

分锁申请成功时,红锁才申请成功。红锁一般用于解决 Redis 主从集群锁丢失问题。

红锁与联锁的区别是,红锁实现的是对一个共享资源的同步访问控制,而联锁实现的是

多个共享资源的同步访问控制。

**读写锁 **

通过 Redisson 可以获取到读写锁 RReadWriteLock。通过 RReadWriteLock 实例可分别获

取到读锁 RedissonReadLock 与写锁 RedissonWriteLock。读锁与写锁分别是实现了 RLock 的可

重入锁。

一个共享资源,在没有写锁的情况下,允许同时添加多个读锁。只要添加了写锁,任何

读锁与写锁都不能再次添加。即读锁是共享锁,写锁为排他锁。

**信号量 **

通过Redisson可以获取到信号量RSemaphore。RSemaphore的常用场景有两种:一种是,

无论谁添加的锁,任何其它线程都可以解锁,就可以使用 RSemaphore。另外,当一个线程

需要一次申请多个资源时,可使用 RSemaphore。RSemaphore 是信号量机制的典型应用。

@GetMapping("/sk8")
public String seckillHandler8() {
    RSemaphore rs = redisson.getSemaphore("redis_semaphore");
    try {
        int buy = ThreadLocalRandom.current().nextInt(5) + 1;
        Boolean lockOK = rs.tryAcquire(buy, 10, TimeUnit.SECONDS);

        if (!lockOK) {
            return "没有抢到锁";
        }
        // 添加锁成功
        // 从Redis中获取库存
        String stock = srt.opsForValue().get("sk:0008");
        int amount = stock == null ? 0 : Integer.parseInt(stock);
        if (amount > 0) {
            // 修改库存后再写回Redis
            srt.opsForValue().set("sk:0008", String.valueOf(--amount));
            return "库丰剩余" + amount + "台";
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    return "抱歉,您没有抢到";
}

**可过期信号量 **

通过 Redisson 可以获取到可过期信号量 PermitExpirableSemaphore。该信号量是在

RSemaphore 基础上,为每个信号增加了一个过期时间,且每个信号都可以通过独立的 ID 来

辨识。释放时也只能通过提交该 ID 才能释放。

不过,一个线程每次只能申请一个信号量,当然每次了只会释放一个信号量。这是与

RSemaphore 不同的地方。

该信号量为互斥信号量时,其就等同于可重入锁。或者说,可重入锁就相当于信号量为

1 的可过期信号量。

注意,可过期信号量与可重入锁的区别:

 可重入锁:相当于用户每次只能申请 1 个信号量,且只有一个用户可以申请成功

 可过期信号量:用户每次只能申请 1 个信号量,但可以有多个用户申请成功

    @GetMapping("/test2")
    public String test2() {

        RPermitExpirableSemaphore rs = redisson.getPermitExpirableSemaphore("redis_semaphore");
        String permitId = null;
        try {
            // 对信号量的申请(P操作)
            // 申请1个信号,返回辨识ID
            permitId = rs.acquire();
            // 申请1个信号,若没有成功,则最多等待10秒,返回辨识ID
            permitId = rs.tryAcquire(10, TimeUnit.SECONDS);

            // 业务逻辑
            // ……

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 对信号量的释放(V操作)
            // 释放1个信号量,需要携带辨识ID
            rs.release(permitId);
            boolean releaseOK = rs.tryRelease(permitId);
        }

        return null;
    }

**分布式闭锁 **

通过 Redisson 可以获取到分布式闭锁 RCountDownLatch,其与 JDK 的 JUC 中的闭锁

CountDownLatch 原理相同,用法类似。其常用于一个或者多个线程的执行必须在其它某些

任务执行完毕的场景。例如,大规模分布式并行计算中,最终的合并计算必须基于很多并行

计算的运行完毕。

闭锁中定义了一个计数器和一个阻塞队列。阻塞队列中存放着待执行的线程。每当一个

并行任务执行完毕,计数器就减 1。当计数器递减到 0 时就会唤醒阻塞队列的所有线程。

通常使用 Barrier 队列解决该问题,而 Barrier 队列通常使用 Zookeeper 实现。

    @GetMapping("/test3")
    public String test3() {
        // 获取闭锁对象(合并线程与条件线程中都需要该代码)
        RCountDownLatch latch = redisson.getCountDownLatch("countDownLatch");

        // 设置闭锁计数器初值,使用该语句的场景:
        // 1)Redis中没有设置该值
        // 2)Redis中设置了该值,但已经变为了0,需要重置
        latch.trySetCount(10);

        // 在合并线程中要等待着闭锁的打开
        try {
            // 阻塞合并线程,直到锁打开
            latch.await();
            // 阻塞合并线程,直到锁打开或5秒后
            latch.await(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 条件线程代码
        // 使闭锁计数器减一
        latch.countDown();

        return null;
    }

本文转载自: https://blog.csdn.net/m0_65228708/article/details/134980987
版权归原作者 SphereX 所有, 如有侵权,请联系我们删除。

“Redis - 分布式锁、Redisson”的评论:

还没有评论