0


Redisson分布式锁

文章目录

一、Redisson简单介绍

Redisson是一个在Redis的基础上实现的Java驻内存数据网格,可参考Redisson官方文档使用,它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

二、Redisson简单使用

1. maven引用

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.19.1</version></dependency>

2. RedisConfig配置

@ConfigurationpublicclassRedissonConfig{@Value("${spring.redis.host}")privateString redisAddress;@BeanpublicRedissonClientredissonClient(){// 默认连接地址 127.0.0.1:6379RedissonClient redisson =Redisson.create();//        Config config = new Config();//        config.useSingleServer().setAddress("redis://" + redisAddress); //redis服务器地址//                .setDatabase(0) //制定redis数据库编号//                .setUsername("").setPassword() // redis用户名密码//                .setConnectionMinimumIdleSize(10) //连接池最小空闲连接数//                .setConnectionPoolSize(50) //连接池最大线程数//                .setIdleConnectionTimeout(60000) //线程的超时时间//                .setConnectTimeout(6000) //客户端程序获取redis链接的超时时间//                .setTimeout(60000) //响应超时时间//        RedissonClient redisson = Redisson.create(config);return redisson;}}

3. StockRedissonService

@ServicepublicclassStockRedissonService{@AutowiredprivateStringRedisTemplate redisTemplate;@AutowiredprivateRedissonClient redissonClient;publicvoiddeduct(){RLock rlock = redissonClient.getLock("lock");
        rlock.lock();try{// 1。 查询库存String stockStr = redisTemplate.opsForValue().get("stock");if(!StringUtil.isNullOrEmpty(stockStr)){int stock =Integer.parseInt(stockStr);// 2。 判断条件是否满足if(stock >0){// 3 更新redis
                    redisTemplate.opsForValue().set("stock",String.valueOf(stock -1));}}}finally{
            rlock.unlock();}}}

简单来说,就是直接

RLock rlock = redissonClient.getLock(“lock”);

获取到锁,然后lock()和unlock()即可。

4. 测试

在这里插入图片描述
并发达到了660,比之前自己写的redis LUA脚本还要高。

三、Redisson源码

1. 加锁

<T>RFuture<T>tryLockInnerAsync(long waitTime,long leaseTime,TimeUnit unit,long threadId,RedisStrictCommand<T> command){returnevalWriteAsync(getRawName(),LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) "+"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime),getLockName(threadId));}

这就是加锁的核心代码,其实本质上也就是LUA脚本+hash数据类型

// 如果这个key不存在,或者这个key是我自己的,那么state+1,并且重置失效时间"if ((redis.call('exists', KEYS[1]) == 0) "+"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then "+"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+"redis.call('pexpire', KEYS[1], ARGV[1]); "+"return nil; "+"end; "+// 如果不满足,则以毫秒为单位,返回key的过期时间"return redis.call('pttl', KEYS[1]);",

pttl: 以毫秒为单位返回 key 的剩余过期时间。
当 key 不存在时,返回 -2 。
当 key 存在但没有设置剩余生存时间时,返回 -1 。
否则,以毫秒为单位,返回 key 的剩余生存时间。

这其实和我们之前手写的lua脚本并没有什么太大差别,只是它这里return nil表示的是加锁成功

2. 解锁

protectedRFuture<Boolean>unlockInnerAsync(long threadId){returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "+"return nil;"+"end; "+"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "+"if (counter > 0) then "+"redis.call('pexpire', KEYS[1], ARGV[2]); "+"return 0; "+"else "+"redis.call('del', KEYS[1]); "+"redis.call('publish', KEYS[2], ARGV[1]); "+"return 1; "+"end; "+"return nil;",Arrays.asList(getRawName(),getChannelName()),LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,getLockName(threadId));}

解锁的核心代码,和加锁一样,本质上也是LUA脚本+hash数据类型

// 如果key不是我的,直接返回nil,解锁失败"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "+"return nil;"+"end; "+// 如果key是我的,那么state-1"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "+// state-1后的值,如果>0,说明重入过了,那更新下失效时间"if (counter > 0) then "+"redis.call('pexpire', KEYS[1], ARGV[2]); "+"return 0; "+// state-1后的值,如果=0,说明全部解锁成功了,删除锁,并且publish"else "+"redis.call('del', KEYS[1]); "+"redis.call('publish', KEYS[2], ARGV[1]); "+"return 1; "+"end; "+"return nil;",

3. 自动续期

privatevoidrenewExpiration(){ExpirationEntry ee =EXPIRATION_RENEWAL_MAP.get(getEntryName());if(ee ==null){return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(newTimerTask(){@Overridepublicvoidrun(Timeout timeout)throwsException{ExpirationEntry ent =EXPIRATION_RENEWAL_MAP.get(getEntryName());if(ent ==null){return;}Long threadId = ent.getFirstThreadId();if(threadId ==null){return;}CompletionStage<Boolean> future =renewExpirationAsync(threadId);
                future.whenComplete((res, e)->{if(e !=null){
                        log.error("Can't update lock {} expiration",getRawName(), e);EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}if(res){// reschedule itselfrenewExpiration();}else{cancelExpirationRenewal(null);}});}}, internalLockLeaseTime /3,TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);}

它也是使用的new TimerTask() ,同样也是一次性续期,若res=1,再次续期。

官方文档:如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

不同的是,即使redisson节点宕机,它还是可以自动续期,保证你的业务逻辑正常结束。

4. 总结

其实和我们之前手写实现的redis分布式锁原理差不多,但是它是配合着CompletableFuture去异步实现,更加高效
本质上还是LUA脚本+hash数据类型

四、Redisson公平锁

  1. 公平锁在这里插入图片描述

非公平锁:来个线程就先试试能不能插队,不能插队才去后面排队
公平锁:线程都乖乖去后面排队去,不准插队

  1. 代码

StockController新增一个接口,用于调用公平锁

@GetMapping("fair/lock/{id}")publicStringfairLock(@PathVariable("id")Long id){
        stockService.fairLock(id);return"hello fair lock";}

StockRedissonService

publicvoidfairLock(Long id){RLock fairLock = redissonClient.getFairLock("fairLock");
        fairLock.lock();try{TimeUnit.SECONDS.sleep(3);System.out.println("-----测试公平锁-----");}catch(InterruptedException e){thrownewRuntimeException(e);}finally{
            fairLock.unlock();}}
  1. 测试 访问http://localhost:10010/fair/lock/1 从1一直访问到5

在这里插入图片描述

redis中有个等待队列
在这里插入图片描述
访问是顺序执行。

  1. 那如果把公平锁修改为非公平锁呢
RLock fairLock = redissonClient.getLock("fairLock");
  1. 重新测试在这里插入图片描述 redis中并无等待队列在这里插入图片描述 锁访问顺序也并不是先进先出。

ps:若使用nginx转发会超时重发,修改conf即可
在这里插入图片描述

添加如下:

proxy_connect_timeout 12000;
proxy_send_timeout 12000;
proxy_read_timeout 12000;

五、Redisson读写锁

读写 不能并发 加锁
写写 不能并发 加锁
读读 能并发 不加锁

StockController

@GetMapping("read/lock")publicStringreadLock(){
        stockService.readLock();return"read read lock";}@GetMapping("write/lock")publicStringwriteLock(){
        stockService.writeLock();return"write write lock";}

StockRedissonService

publicvoidreadLock(){RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock");
        lock.readLock().lock(10,TimeUnit.SECONDS);// TODO 疯狂的读//        lock.readLock().unlock();}publicvoidwriteLock(){RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock");
        lock.writeLock().lock(10,TimeUnit.SECONDS);// TODO 疯狂的写//        lock.writeLock().unlock();}

简单测试下,两个页面分别访问
http://localhost:10010/write/lock 写操作
http://localhost:10010/read/lock 读操作

写写: 多次写操作明显等待
在这里插入图片描述

写读:明显等待
在这里插入图片描述
读写:明显等待
在这里插入图片描述
读锁有多个,但写锁只有一个

读读:无等待现象

五、Redisson的RSemaphore信号量

1. Semaphore:

  • 可以用来控制同时访问特定资源的线程数量,常用于限流场景。
  • Semaphore接收一个int整型值,表示 许可证数量。
  • 线程通过调用acquire()获取许可证,执行完成之后通过调用release()归还许可证。
  • 只有获取到许可证的线程才能运行,获取不到许可证的线程将会阻塞。
publicstaticvoidmain(String[] args){Semaphore semaphore =newSemaphore(3);for(int i =0; i <6; i++){newThread(()->{try{
                    semaphore.acquire();System.out.println(Thread.currentThread().getName()+":抢到了停车位");TimeUnit.SECONDS.sleep(newRandom().nextInt(10));System.out.println(Thread.currentThread().getName()+"停了会儿就开走了");
                    semaphore.release();}catch(InterruptedException e){thrownewRuntimeException(e);}}, i +"号车").start();}}

在这里插入图片描述
每次只会停三辆车,走一辆后才可以停下一辆,完美实现限流。

2. RSemaphore:实现分布式限流

StockRedissonService

publicvoidsemaphoreLock(){RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        semaphore.trySetPermits(3);//设置资源量,限流的线程数try{
            semaphore.acquire();// 获取资源,获取资源成功的线程可以继续处理业务操作,否则会被阻塞住
            redisTemplate.opsForList().rightPush("log",port +"端口获取了资源,开始处理业务逻辑"+Thread.currentThread().getName());TimeUnit.SECONDS.sleep(newRandom().nextInt(10));
            redisTemplate.opsForList().rightPush("log",port +"端口处理完,释放了资源"+Thread.currentThread().getName());
            semaphore.release();// 手动释放资源,后续请求线程可以获取该资源}catch(InterruptedException e){thrownewRuntimeException(e);}}

用redis的list数据类型来记录日志,方便集群部署的日志查看。

  • 注意:
semaphore.trySetPermits(3);//设置资源量,限流的线程数

这会在redis中成semaphore的key,如果要修改资源量,必须手动把redis中该key删除,否则只在代码中修改,重启后无法生效。
在这里插入图片描述

简单测试下,多次访问后
在这里插入图片描述

集群部署下,共用了信号量的限流,两个端口启动,同一时间限流了3个线程。

六、Redisson的RCountDownLatch

1. CountDownLatch:允许一个或者多个线程去等待其他线程完成操作。

  • CountDownLatch接收一个int型参数,表示要等待的工作线程的个数。
  • await(): 使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒。
  • countDown(): 使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。
publicstaticvoidmain(String[] args)throwsInterruptedException{// 班长线程CountDownLatch countDownLatch =newCountDownLatch(6);for(int i =0; i <6; i++){newThread(()->{System.out.println(Thread.currentThread().getName()+"准备出门了");try{TimeUnit.SECONDS.sleep(10);}catch(InterruptedException e){thrownewRuntimeException(e);}System.out.println(Thread.currentThread().getName()+"睡了一会,终于出门了");
                countDownLatch.countDown();}, i +"号同学").start();}

        countDownLatch.await();System.out.println("班长锁门了");}

班长等六个同学全部出门后才可以锁门
在这里插入图片描述

2. RCountDownLatch

  • StockController
@GetMapping("test/countDown")publicStringcountDown(){
        stockService.countDown();return"出来了一位同学";}@GetMapping("test/latch")publicStringtestLatch(){
        stockService.testLatch();return"班长锁门了";}
  • countDown 同学出门
publicvoidcountDown(){RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");// TODO 出门准备完成
        cdl.countDown();}
  • latch 等六位同学出门后,班长锁门
publicvoidtestLatch(){RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");
        cdl.trySetCount(6);try{
            cdl.await();}catch(InterruptedException e){thrownewRuntimeException(e);}// TODO 准备锁门}
RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");

其中的“cdl"即是存放在redis的key,两个方法必须命名一致。

分别访问接口
http://localhost/test/countDown 出门
http://localhost/test/latch 班长锁门
只有访问六次出门后,才可以成功锁门
在这里插入图片描述

redis中存放着cdl的key,也就是目前的等待线程数。

七、 总结

redisson: redis的java客户端,分布式锁
玩法:

  1. 引入依赖
  2. java配置类:RedissonConfig
  3. 代码使用
  • 可重入锁RLock对象: CompletableFuture + LUA脚本 + hash
RLock rlock = redissonClient.getLock("xxx");
rlock.lock()/unlock()
  • 公平锁:
RLock fairLock = redissonClient.getLock("xxx");
fairLock.lock()/unlock()
  • 联锁 和 红锁
  • 读写锁:
RReadWriteLock lock = redissonClient.getReadWriteLock("xxx");
lock.readLock().lock()/unlock();
lock.writeLock().lock()/unlock();
  • 信号量:
RSemaphore semaphore = redissonClient.getSemaphore("xxx");
semaphore.trySetPermits(3);//设置资源量,限流的线程数
semaphore.acquire()/release()
  • 闭锁:
RCountDownLatch cdl = redissonClient.getCountDownLatch("xxx");
cdl.trySetCount(6);
cdl.await()/countDown()
标签: 分布式 redis java

本文转载自: https://blog.csdn.net/ideaxx/article/details/128613433
版权归原作者 范大 所有, 如有侵权,请联系我们删除。

“Redisson分布式锁”的评论:

还没有评论