0


【Java项目推荐】值得写到简历上的项目--黑马点评

优惠卷秒杀

前言

项目是b站黑马程序员的redis教程中的案例,建议所有java程序员做一下!

这篇博客会从最简单的实现优惠卷秒杀到加分布式锁、对秒杀优化、使用消息队列异步下单做详细介绍!

优惠券秒杀

实现优惠券秒杀下单

image-20221224200421157

@Override@TransactionalpublicResultseckillVoucher(Long voucherId){/**
     * 秒杀基本实现一:
     *  1.查询优惠卷
     *  2.判断秒杀是否开始
     *  3.判断是否结束
     *  4.判断库存是否充足
     *  5.扣减库存
     *  6.创建订单
     */SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if(voucher.getBeginTime().isAfter(LocalDateTime.now())){//尚未开始returnResult.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){returnResult.fail("秒杀已经结束!");}if(voucher.getStock()<1){returnResult.fail("库存不足!");}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();if(!success){returnResult.fail("库存不足!");}VoucherOrder voucherOrder =newVoucherOrder();long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);Long userId =UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);

    voucherOrder.setVoucherId(voucherId);save(voucherOrder);returnResult.ok(orderId);}

超卖问题

超卖情况

image-20221105225642655

image-20221227204752096

加锁解决超卖问题

image-20221105230150840

乐观锁

两种方式是心爱乐观锁:

  • 版本号法
  • CAS法(Compare And swap)

实现方式:

每当数据做一次修改,版本号加1,所以判断一个数据有没有被修改过就看它的版本有没有变化过

image-20221105231012183

CAS法:

Compare and Swap,即比较再交换。

也就是我不在判断库存有没有被修改过了,我每次都去比较看库存是否小于0

image-20221105231124576

乐观锁解决超卖问题

乐观锁更新操作的时候使用

@Override@TransactionalpublicResultseckillVoucher(Long voucherId){/**
     * 秒杀基本实现二:
     *  1.查询优惠卷
     *  2.判断秒杀是否开始
     *  3.判断是否结束
     *  4.判断库存是否充足
     *  5.扣减库存(乐观锁解决超卖问题)
     *  6.创建订单
     */SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if(voucher.getBeginTime().isAfter(LocalDateTime.now())){//尚未开始returnResult.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){returnResult.fail("秒杀已经结束!");}if(voucher.getStock()<1){returnResult.fail("库存不足!");}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0).update();if(!success){returnResult.fail("库存不足!");}VoucherOrder voucherOrder =newVoucherOrder();long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);Long userId =UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);

    voucherOrder.setVoucherId(voucherId);save(voucherOrder);returnResult.ok(orderId);}

一人一单

重复下单情况

image-20221105194348762

解决思路

@OverridepublicResultseckillVoucher(Long voucherId){/**
       * 秒杀基本实现三:
       * 悲观锁实现一人一单
       */SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if(voucher.getBeginTime().isAfter(LocalDateTime.now())){//尚未开始returnResult.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){returnResult.fail("秒杀已经结束!");}if(voucher.getStock()<1){returnResult.fail("库存不足!");}Long userId =UserHolder.getUser().getId();synchronized(userId.toString().intern()){//获取事务代理对象IVoucherOrderService proxy =(IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}}@TransactionalpublicResultcreateVoucherOrder(Long voucherId){Long userId =UserHolder.getUser().getId();int count =query().eq("user_id", userId).eq("voucher_id", voucherId).count();if(count >0){returnResult.fail("您已经购买过一次了!");}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).gt("stock",0).update();if(!success){returnResult.fail("库存不足!");}VoucherOrder voucherOrder =newVoucherOrder();long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);

        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);save(voucherOrder);returnResult.ok(orderId);}

防止事务失效

synchronized(userId.toString().intern()){//获取事务代理对象IVoucherOrderService proxy =(IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}

事务的生效其实是Spring拿到当前对象的代理对象,这里如果直接调用就不是Spring的代理对象了事务就会失效,所以要获取当前对象的代理对象

通过

AopContext.currentProxy()

API去获取

为什么把生成订单单独提取出来?

因为只有生成订单才会对数据库有插入操作,这个时候才需要事务。

需要事务的同时一人一单也需要加锁(悲观锁)

而且要在事务提交之后在释放锁!

测试

image-20230107150811523

分布式锁

集群下的线程并发安全问题

加锁的原理就是在

JVM内部维护了一个锁监视器

,如果是集群模式下的话那就是多个JVM,悲观锁就失效了

image-20230107151108120

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

模拟集群环境:

  • 在开一个服务,端口是8082,通过在Vm option里添加-Dserver.port=8082
  • nginx做负载均衡

image-20221227224135074

image-20221227224333180

此时测试,还是会发现会出现一人多单的情况

什么是分布式锁?

满足

分布式系统

下或者

集群模式

下的

多线程可见并且互斥的锁

image-20230107153319260

分布式锁的实现方式

image-20230107153820308

redis分布式锁测试

# 实现分布式锁时需要实现的两个基本方法# 1.添加锁,利用setnx的互斥特性
SETNX lock thread1
# 2.添加锁过期时间,避免服务宕机引起的死锁
EXPIRE lock 10# 释放锁,删除即可127.0.0.1:6379> DEL lock
(integer)1

redis中加锁的一些特殊情况

如果下订单的线程在redis中加了锁,这时如果redis宕机了,那么其他线程就会一直处于等待状态,这时就出现了死锁的现象。

如何解决?

利用redis中key过期时间,自动释放锁能避免服务宕机引起的死锁

如果服务在加锁和过期释放期间宕机怎么办?

保证加锁和过期释放的原子性!

redis的set命令可以跟上很多参数,可以同时保证加锁和设置过期时间

  • EX:设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value
  • NX:只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。

因为 SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果,所以将来的 Redis 版本可能会废弃并最终移除 SETNX 、 SETEX 和 PSETEX 这三个命令。

127.0.0.1:6379> SET lock thread1 NX EX 10
OK
127.0.0.1:6379> ttl lock
(integer) 5
127.0.0.1:6379> ttl lock
(integer) -2

手动实现分布式锁

实现分布式锁的流程

image-20230108120024394

impl

实现分布式锁的思路就是通过redis中的setnx(如果不存在就创建key,存在就不创建)这样的互斥命令

每当有线程过来抢购的时候,首先会获取锁,也就是执行redis中的setnx命令。如果再有线程过来抢购那么就会被阻塞,只有等该锁被释放其他线程才能再次获取

@OverridepublicResultseckillVoucher(Long voucherId){SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if(voucher.getBeginTime().isAfter(LocalDateTime.now())){//尚未开始returnResult.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){returnResult.fail("秒杀已经结束!");}if(voucher.getStock()<1){returnResult.fail("库存不足!");}Long userId =UserHolder.getUser().getId();//生成分布式锁对象,传入当前用户id和stringRedisTemplate对象SimpleRedisLock lock =newSimpleRedisLock("order:"+ userId, stringRedisTemplate);//boolean isLock = lock.tryLock(1200);if(!isLock){//            获取锁失败,返回错误或重试returnResult.fail("不允许重复下单!");}try{//获取事务代理对象IVoucherOrderService proxy =(IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally{
    lock.unlock();}}publicclassSimpleRedisLockimplementsILock{//不同的业务有不同的名称privateString name;privateStringRedisTemplate stringRedisTemplate;//对锁定义一个统一的前缀privatestaticfinalStringKEY_PREFIX="lock:";//锁的名称要求用户传递给我们,所以这里我们定义一个构造函数publicSimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@OverridepublicbooleantryLock(long timeoutSec){/**
         * 版本一:
         * 基础实现
         * key就是固定前缀+锁的名称,value就是线程标识
         * SET lock thread1 NX EX 10
         */String threadId =Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+ name, threadId, timeoutSec,TimeUnit.SECONDS);returnBoolean.TRUE.equals(success);}@Overridepublicvoidunlock(){
        stringRedisTemplate.delete(ID_PREFIX+ name);}

分布式锁误删情况1

分布式锁误删情况说明

获取锁之后,线程A的业务出现了阻塞,直到锁到了超时时间被自动释放,业务还在处于阻塞状态。

这时线程B获取锁开始执行自己的业务,此时线程A阻塞的业务完成后,会把锁给删掉!这样就是分布式锁误删的情况。

所以我们在删除锁的时候需要进行一个判断,看看删除的是不是当前线程所持有的锁

image-20230108202201918

@OverridepublicResultseckillVoucher(Long voucherId){SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if(voucher.getBeginTime().isAfter(LocalDateTime.now())){//尚未开始returnResult.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){returnResult.fail("秒杀已经结束!");}if(voucher.getStock()<1){returnResult.fail("库存不足!");}Long userId =UserHolder.getUser().getId();//生成分布式锁对象,传入当前用户id和stringRedisTemplate对象SimpleRedisLock lock =newSimpleRedisLock("order:"+ userId, stringRedisTemplate);//boolean isLock = lock.tryLock(1200);if(!isLock){//            获取锁失败,返回错误或重试returnResult.fail("不允许重复下单!");}try{//获取事务代理对象IVoucherOrderService proxy =(IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally{
    lock.unlock();}}publicclassSimpleRedisLockimplementsILock{//不同的业务有不同的名称privateString name;privateStringRedisTemplate stringRedisTemplate;//对锁定义一个统一的前缀privatestaticfinalStringKEY_PREFIX="lock:";//锁的名称要求用户传递给我们,所以这里我们定义一个构造函数publicSimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.name = name;this.stringRedisTemplate = stringRedisTemplate;}privatestaticfinalStringID_PREFIX=UUID.randomUUID().toString(true)+"-";@OverridepublicbooleantryLock(long timeoutSec){/**
         * 版本一:
         * 基础实现
         * key就是固定前缀+锁的名称,value就是线程标识
         * SET lock thread1 NX EX 10
         */String threadId =ID_PREFIX+Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+ name, threadId, timeoutSec,TimeUnit.SECONDS);returnBoolean.TRUE.equals(success);}@Overridepublicvoidunlock(){/**
         * 版本二:
         * 释放锁的时候判断是不是当前线程的锁
         *///获取线程idString threadId =ID_PREFIX+Thread.currentThread().getId();//获取keyString id = stringRedisTemplate.opsForValue().get(KEY_PREFIX+ name);if(threadId.equals(id)){
            stringRedisTemplate.delete(KEY_PREFIX+ name);}}

分布式锁误删情况2

情况说明

如果JVM发送FULL GC时会阻塞所有的代码,因为

判断标识是否一致和释放锁是两步

,所以在判断成功之后如果发生FUll GC那么其他线程再次获取锁的时候,还是可能发生误删的情况

image-20230108202723108

为了避免这种情况的发生,我们必须保证判断锁标识的动作和释放锁的动作是原子性的!

这就是下面我们要学习的lua脚本

lua脚本解决多条命令的原子性问题

什么是lua脚本?

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:

https://www.runoob.com/lua/lua-tutorial.html

lua脚本的基本使用

在lua脚本中调用函数如下:

redis.call('命令名称','key','其它参数',...)

例如,我们要执行set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下

# 先执行 set name jack
redis.call('set', 'name', 'jack')# 再执行 get namelocal name = redis.call('get', 'name')# 返回return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

127.0.0.1:6379> help @scripting

  EVAL script numkeys [key [key ...]][arg [arg ...]]
  summary: Execute a Lua script server side
  since:2.6.0

例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:

127.0.0.1:6379> EVAL "return redis.call('set','name','Jack')"0

如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数

127.0.0.1:6379> EVAL "return redis.call('set',KEYS[1],ARGV[1])"1 name Tom
OK
127.0.0.1:6379> get name
"Tom"

Java调用lua脚本改造分布式锁

释放锁的业务流程是这样的:

  • 获取锁中的线程标示
  • 判断是否与指定的标示(当前线程标示)一致
  • 如果一致则释放锁(删除)
  • 如果不一致则什么都不做
  • 如果用Lua脚本来表示则是这样的
------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by qiang.--- DateTime: 2023/1/8 21:24---if(redis.call('get', KEYS[1])== ARGV[1])then--释放锁return redis.call('del', KEYS[1])end--如果不匹配return0

RedisTemplate调用Lua脚本的API如下:

image-20230108213936297

@OverridepublicResultseckillVoucher(Long voucherId){SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if(voucher.getBeginTime().isAfter(LocalDateTime.now())){//尚未开始returnResult.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){returnResult.fail("秒杀已经结束!");}if(voucher.getStock()<1){returnResult.fail("库存不足!");}Long userId =UserHolder.getUser().getId();//生成分布式锁对象,传入当前用户id和stringRedisTemplate对象SimpleRedisLock lock =newSimpleRedisLock("order:"+ userId, stringRedisTemplate);//boolean isLock = lock.tryLock(1200);if(!isLock){//            获取锁失败,返回错误或重试returnResult.fail("不允许重复下单!");}try{//获取事务代理对象IVoucherOrderService proxy =(IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally{
    lock.unlock();}}publicclassSimpleRedisLockimplementsILock{//不同的业务有不同的名称privateString name;privateStringRedisTemplate stringRedisTemplate;//对锁定义一个统一的前缀privatestaticfinalStringKEY_PREFIX="lock:";//锁的名称要求用户传递给我们,所以这里我们定义一个构造函数publicSimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate){this.name = name;this.stringRedisTemplate = stringRedisTemplate;}privatestaticfinalStringID_PREFIX=UUID.randomUUID().toString(true)+"-";privatestaticfinalDefaultRedisScript<Long>UNLOCK_SCRIPT;static{UNLOCK_SCRIPT=newDefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(newClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}@OverridepublicbooleantryLock(long timeoutSec){/**
         * 版本一:
         * 基础实现
         * key就是固定前缀+锁的名称,value就是线程标识
         * SET lock thread1 NX EX 10
         */String threadId =ID_PREFIX+Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+ name, threadId, timeoutSec,TimeUnit.SECONDS);returnBoolean.TRUE.equals(success);}@Overridepublicvoidunlock(){/**
         * 版本三
         * 通过lua脚本来释放锁
         */
        stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+ name),ID_PREFIX+Thread.currentThread().getId());}}}

Redisson

setnx实现的分布式锁存在的问题

image-20230108221629597

什么是Redisson?

Redisson基于redis实现了一套分布式工具的集合

image-20230108221802187

官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisson

Redisson快速入门

一、引入依赖:

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.1</version></dependency>

二、配置Redisson客户端

@ConfigurationpublicclassRedissonConfig{@BeanpublicRedissonClientredissonClient(){// 配置Config config =newConfig();
        config.useSingleServer().setAddress("redis://172.20.10.2:6379");// 创建RedissonClient对象returnRedisson.create(config);}}

三、使用Redisson的分布式锁

@OverridepublicResultseckillVoucher(Long voucherId){SeckillVoucher voucher = seckillVoucherService.getById(voucherId);if(voucher.getBeginTime().isAfter(LocalDateTime.now())){//尚未开始returnResult.fail("秒杀尚未开始!");}if(voucher.getEndTime().isBefore(LocalDateTime.now())){returnResult.fail("秒杀已经结束!");}if(voucher.getStock()<1){returnResult.fail("库存不足!");}Long userId =UserHolder.getUser().getId();RLock lock = redissonClient.getLock("lock:order:"+ userId);/**
         * tryLock参数说明:
         * long waitTime 超时等待时间  默认是-1,也就是不等待,获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放
         */boolean isLock = lock.tryLock();if(!isLock){//            获取锁失败,返回错误或重试returnResult.fail("不允许重复下单!");}try{//获取事务代理对象IVoucherOrderService proxy =(IVoucherOrderService)AopContext.currentProxy();return proxy.createVoucherOrder(voucherId);}finally{
    lock.unlock();}}

秒杀优化

异步秒杀思路

将原先逻辑的串行执行改为异步执行,也就是将

判断用户有没有秒杀资格交给redis去做

,如果有那就把

用户信息

订单信息

保存到

阻塞队列

中交给其他线程去执行

image-20230109151329636

基于redis完成秒杀资格判断

redis数据结构的选择

判断库存是否充足:可以选用String结构,key是优惠卷的信息,value是库存的数量

判断一人一单:可以选用set结构,将下过单的用户保存到set集合当中

流程如下:

image-20230109152002579

实现

1.新增秒杀优惠券的同时,将优惠券信息保存到Redis中

@Override@TransactionalpublicvoidaddSeckillVoucher(Voucher voucher){......
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+ voucher.getId(), voucher.getStock().toString());}

2.基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by qiang.--- DateTime: 2023/1/9 14:41----- 1.参数列表--判断库存是否充足需要去redis中去查,所以需要知道优惠卷id-- 1.1优惠卷idlocal voucherId = ARGV[1]--判断一人一单需要知道用户id-- 1.2用户idlocal userId = ARGV[2]-- 1.3.订单id--local orderId = ARGV[3]-- 2.数据库key-- 2.1库存keylocal stockKey ='seckill:stock:'.. voucherId
-- 2.2订单keylocal orderKey ='seckill:order:'.. voucherId

-- 3.脚本业务-- 3.1判断库存是否大于0 get stockif(tonumber(redis.call('get', stockKey))<=0)then-- 3.2.库存不足,返回1return1end-- 判断用户是否下单 SISMEMBER orderKey userId--127.0.0.1:6379> sadd setCollection1 1 2 3--(integer) 3--127.0.0.1:6379> SISMEMBER setCollection1 2--(integer) 1--127.0.0.1:6379> SISMEMBER setCollection1 0--(integer) 0if(redis.call('SISMEMBER', orderKey, userId)==1)then--存在,说明是重复下单return2end-- 3.4扣库存 incrby stockKey - 1
redis.call('incrby', stockKey,-1)-- 3.5下单
redis.call('sadd', orderKey, userId)return0

3.java代码如下

PS:秒杀卷必须已经被保存到redis当中

因为lua脚本在执行的时候会去redis中读取优惠卷的库存,不然会出现如下错误:

user_script:26: attempt to compare nil with number

@ServicepublicclassVoucherOrderServiceImplextendsServiceImpl<VoucherOrderMapper,VoucherOrder>implementsIVoucherOrderService{@ResourceprivateISeckillVoucherService seckillVoucherService;@ResourceprivateRedisIdWorker redisIdWorker;@ResourceprivateRedissonClient redissonClient;@ResourceprivateStringRedisTemplate stringRedisTemplate;//加载seckill.lua文件privatestaticfinalDefaultRedisScript<Long>SECKILL_SCRIPT;static{SECKILL_SCRIPT=newDefaultRedisScript<>();SECKILL_SCRIPT.setLocation(newClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@OverridepublicResultseckillVoucher(Long voucherId){/**
         * 秒杀实现六
         * 通过redis的lua脚本对秒杀进行优化
         */Long userId =UserHolder.getUser().getId();//        执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),
                voucherId.toString(), userId.toString());//        判断结果是否为0int r = result.intValue();if(r !=0){//            不为0,代表没有购买资格returnResult.fail(r ==1?"库存不足":"不能重复下单");}//        为0,有购买资格,把下单信息保存到阻塞队列long orderId = redisIdWorker.nextId("order");//        返回订单idreturnResult.ok(orderId);}}

基于阻塞队列实现异步下单

防止事务失效

事务的生效其实是Spring拿到当前对象的代理对象,这里如果直接调用(直接创建订单)就不是Spring的代理对象了事务就会失效,所以要获取当前对象的代理对象通过

AopContext.currentProxy()

API去获取

@Slf4j@ServicepublicclassVoucherOrderServiceImplextendsServiceImpl<VoucherOrderMapper,VoucherOrder>implementsIVoucherOrderService{@ResourceprivateISeckillVoucherService seckillVoucherService;@ResourceprivateRedisIdWorker redisIdWorker;@ResourceprivateRedissonClient redissonClient;@ResourceprivateStringRedisTemplate stringRedisTemplate;//加载seckill.lua文件privatestaticfinalDefaultRedisScript<Long>SECKILL_SCRIPT;static{SECKILL_SCRIPT=newDefaultRedisScript<>();SECKILL_SCRIPT.setLocation(newClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//阻塞队列privateBlockingQueue<VoucherOrder> orderTasks =newArrayBlockingQueue<>(1024*1024);//线程池privatestaticfinalExecutorServiceSECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//在类初始化之后就执行init方法//init方法会去执行创建订单线程VoucherOrderHandler(新的线程,通过实现Runnable接口创建)@PostConstructprivatevoidinit(){SECKILL_ORDER_EXECUTOR.submit(newVoucherOrderHandler());}//VoucherOrderHandler为线程任务//该线程会去读阻塞队列中的订单信息,然后再去调用创建订单方法,完成异步下单privateclassVoucherOrderHandlerimplementsRunnable{@Overridepublicvoidrun(){while(true){try{//                获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();//                    创建订单handleVoucherOrder(voucherOrder);}catch(Exception e){
          log.error("处理订单异常",e);}}}}/**
     * 异步创建订单
     */privatevoidhandleVoucherOrder(VoucherOrder voucherOrder){//这里创建订单的线程不是主线程,所以不能从userHolder里获取用户,只能从订单对象中获取用户idLong userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:"+ userId);/**
         * tryLock参数说明:
         * long waitTime 超时等待时间  默认是-1,也就是不等待,获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放
         */boolean isLock = lock.tryLock();if(!isLock){//兜底方案,其实不用再去获取锁,因为在lua脚本中已经判断过一人一单
      log.error("不允许重复下单!");return;}try{//获取事务代理对象//这里不能通过AopContext.currentProxy()去获取代理对象,因为创建优惠卷订单(createVoucherOrder)是在主线程执行的//而当前方法是新的线程执行的代码,我们必须用主线程才能防止创建优惠卷订单(createVoucherOrder)事务失效
      proxy.createVoucherOrder(voucherOrder);}finally{
      lock.unlock();}}IVoucherOrderService proxy;@OverridepublicResultseckillVoucher(Long voucherId){/**
         * 秒杀实现六
         * 通过redis的lua脚本对秒杀进行优化
         */Long userId =UserHolder.getUser().getId();//      1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),
      voucherId.toString(), userId.toString());//       2 判断结果是否为0int r = result.intValue();if(r !=0){//            2.1 不为0,代表没有购买资格returnResult.fail(r ==1?"库存不足":"不能重复下单");}//        2.2 为0,有购买资格,把下单信息保存到阻塞队列VoucherOrder voucherOrder =newVoucherOrder();//        2.3 订单idlong orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);//        2.4 用户id
    voucherOrder.setUserId(userId);//        2.5 代金劵id
    voucherOrder.setVoucherId(voucherId);//        2.6 放入阻塞队列
    orderTasks.add(voucherOrder);//        返回订单idreturnResult.ok(orderId);}@TransactionalpublicvoidcreateVoucherOrder(VoucherOrder voucherOrder){Long userId = voucherOrder.getUserId();int count =query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();if(count >0){
      log.error("您已经购买过一次了!");return;}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update();if(!success){
      log.error("库存不足!");return;}save(voucherOrder);}}

总结

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
IVoucherOrderService proxy;@OverridepublicResultseckillVoucher(Long voucherId){/**
         * 秒杀实现六
         * 通过redis的lua脚本对秒杀进行优化
         */Long userId =UserHolder.getUser().getId();//      1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),
    voucherId.toString(), userId.toString());//       2 判断结果是否为0int r = result.intValue();if(r !=0){//            2.1 不为0,代表没有购买资格returnResult.fail(r ==1?"库存不足":"不能重复下单");}//        2.2 为0,有购买资格,把下单信息保存到阻塞队列VoucherOrder voucherOrder =newVoucherOrder();//        2.3 订单idlong orderId = redisIdWorker.nextId("order");
  voucherOrder.setId(orderId);//        2.4 用户id
  voucherOrder.setUserId(userId);//        2.5 代金劵id
  voucherOrder.setVoucherId(voucherId);//        2.6 放入阻塞队列
  orderTasks.add(voucherOrder);//        返回订单idreturnResult.ok(orderId);}
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
//加载seckill.lua文件privatestaticfinalDefaultRedisScript<Long>SECKILL_SCRIPT;static{SECKILL_SCRIPT=newDefaultRedisScript<>();SECKILL_SCRIPT.setLocation(newClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//阻塞队列privateBlockingQueue<VoucherOrder> orderTasks =newArrayBlockingQueue<>(1024*1024);//线程池privatestaticfinalExecutorServiceSECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//在类初始化之后就执行init方法//init方法会去执行创建订单线程VoucherOrderHandler(新的线程,通过实现Runnable接口创建)@PostConstructprivatevoidinit(){SECKILL_ORDER_EXECUTOR.submit(newVoucherOrderHandler());}//VoucherOrderHandler为线程任务//该线程会去读阻塞队列中的订单信息,然后再去调用创建订单方法,完成异步下单privateclassVoucherOrderHandlerimplementsRunnable{@Overridepublicvoidrun(){while(true){try{//                获取队列中的订单信息VoucherOrder voucherOrder = orderTasks.take();//                    创建订单handleVoucherOrder(voucherOrder);}catch(Exception e){
          log.error("处理订单异常",e);}}}}/**
     * 异步创建订单
     */privatevoidhandleVoucherOrder(VoucherOrder voucherOrder){//这里创建订单的线程不是主线程,所以不能从userHolder里获取用户,只能从订单对象中获取用户idLong userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:"+ userId);/**
         * tryLock参数说明:
         * long waitTime 超时等待时间  默认是-1,也就是不等待,获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放
         */boolean isLock = lock.tryLock();if(!isLock){//兜底方案,其实不用再去获取锁,因为在lua脚本中已经判断过一人一单
      log.error("不允许重复下单!");return;}try{//获取事务代理对象//这里不能通过AopContext.currentProxy()去获取代理对象,因为创建优惠卷订单(createVoucherOrder)是在主线程执行的//而当前方法是新的线程执行的代码,我们必须用主线程才能防止创建优惠卷订单(createVoucherOrder)事务失效
      proxy.createVoucherOrder(voucherOrder);}finally{
      lock.unlock();}}

Redis消息队列

把订单信息放到阻塞队列的缺点是什么?

  • 无法持久化
  • 受JVM内存大小影响

那为什么消息队列能解决这些问题?

  • 消息队列属于JVM以外的独立服务,不受JVM内存的限制
  • 消息队列里的消息可以持久化,并且消息队列会保证消息至少被消费一次

现在有哪些消息队列可以用呢?

Kafka、RabbitMQ、RocketMQ…

但是我们都不用这些,因为我们项目规模比较小,我们用reids去实现消息队列就可以胜任

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

基于List实现消息队列

# lpush命令在l1队列中放入两个元素127.0.0.1:6379> LPUSH l1 element1 element2
(integer)2# brpop命令在l1队列右侧取出第一个元素,等待时间20s127.0.0.1:6379> BRPOP l1 201)"l1"2)"element1"# brpop命令在l1队列右侧取出第二个元素,等待时间20s127.0.0.1:6379> BRPOP l1 201)"l1"2)"element2"# brpop命令在l1队列右侧取出第三个元素,等待时间20s,元素被取完,命令处于阻塞状态127.0.0.1:6379> BRPOP l1 20

基于List的消息队列有哪些优缺点?

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

PubSub实现消息队列

# 向频道发送消息
127.0.0.1:6379>PUBLISH order.q1 hello
(integer)2127.0.0.1:6379>PUBLISH order.q2 hello
(integer)1

# 订阅一个或多个频道
127.0.0.1:6379>SUBSCRIBE order.q1
Reading messages...(press Ctrl-Ctoquit)1)"subscribe"2)"order.q1"3)(integer)11)"message"2)"order.q1"3)"hello"

# 订阅与pattern格式匹配的所有频道
127.0.0.1:6379>PSUBSCRIBE order.*
Reading messages...(press Ctrl-Ctoquit)1)"psubscribe"2)"order.*"3)(integer)11)"pmessage"2)"order.*"3)"order.q1"4)"hello"1)"pmessage"2)"order.*"3)"order.q2"4)"hello"

基于PubSub的消息队列有哪些优缺点

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化(向频道发送消息如果没人接受,那么消息会被直接丢掉)
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

Stream消息队列

单消费模式

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息

image-20230110155934302

读取消息的方式之一:XREAD

如果不指定阻塞时长

image-20230110160155190

测试
# 写消息127.0.0.1:6379> XADD users * name zs age 12"1673337915109-0"# 读消息127.0.0.1:6379> XREAD count 1 streams user 0(nil)127.0.0.1:6379> XREAD count 1 streams users01)1)"users"2)1)1)"1673337915109-0"2)1)"name"2)"zs"3)"age"4)"12"# 消息可重复读127.0.0.1:6379> XREAD count 1 streams users01)1)"users"2)1)1)"1673337915109-0"2)1)"name"2)"zs"3)"age"4)"12"# 再次发送消息127.0.0.1:6379> XADD users * name ls age 21"1673338159927-0"# 阻塞等待消息            127.0.0.1:6379> XREAD COUNT 1 BLOCK 0 STREAMS users $
1)1)"users"2)1)1)"1673338159927-0"2)1)"name"2)"ls"3)"age"4)"21"(13.00s)

消息漏读情况

当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

image-20230110161449209

消费组模式

image-20230110164956512

创建消费者组

image-20230110165317457

从消费者组读取消息

image-20230110165440794

测试

创建消费者组

127.0.0.1:6379> XGROUP create s1 g1 0
OK

读取消费者组

127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >1)1)"s1"2)1)1)"1673336869091-0"2)1)"k1"2)"v1"

在s1队列中添加多条消息

127.0.0.1:6379> XADD s1 * k2 v2
"1673354707296-0"127.0.0.1:6379> XADD s1 * k3 v3
"1673354711159-0"

读取消费者组中的数据

  • 通过>号获取没有被确认的消息
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >1)1)"s1"2)1)1)"1673354707296-0"2)1)"k2"2)"v2"

读取消费者组中的数据

  • 通过>号获取没有被确认的消息
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 >1)1)"s1"2)1)1)"1673354711159-0"2)1)"k3"2)"v3"

查看消息队列中那一条消息没有被处理

  • -,+号代表id范围,表示所有id
  • 10代表获取消息的数量
127.0.0.1:6379> XPENDING s1 g1 - + 101)1)"1673336869091-0"2)"c1"3)(integer)11137364)(integer)1

读取

pending-list

中没有被确认的消息

  • 0,代表读取pending-list中第一条没有被确认的消息
127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 01)1)"s1"2)1)1)"1673336869091-0"2)1)"k1"2)"v1"

确认消息

127.0.0.1:6379> XACK s1 g1 1673336869091-0
(integer)1

再次查看pending-list中没有被处理的消息

127.0.0.1:6379> XREADGROUP group g1 c1 count 1 block 2000 streams s1 01)1)"s1"2)(empty array)
impl

stream消息队列业务流程处理思路:

  • 1.初始化stream消息队列,如果没有的话就去创建一个,如果stream存在,判断group(消费者组)是否存在
  • 2.加载seckill.lua脚本,去做秒杀资格和一人一单判断,最后把订单相关信息保存到redis的消息队列中
  • 3.开始读取stream消息队列中的消息,然后进行解析,从而获得订单信息进而把订单信息异步保存到数据库中
  • 4.进行消息队列的确认
  • 5.如果遇见没有处理的消息,会被捕捉异常进入handlePendingList方法,重新去pending-list中获取未处理的第一条消息,同样会在数据库中被保存
/**
 * @author qiang
 * @since 2022-12-27
 */@Slf4j@ServicepublicclassVoucherOrderServiceImplextendsServiceImpl<VoucherOrderMapper,VoucherOrder>implementsIVoucherOrderService{@ResourceprivateISeckillVoucherService seckillVoucherService;@ResourceprivateRedisIdWorker redisIdWorker;@ResourceprivateRedissonClient redissonClient;@ResourceprivateStringRedisTemplate stringRedisTemplate;//加载seckill.lua文件privatestaticfinalDefaultRedisScript<Long>SECKILL_SCRIPT;static{SECKILL_SCRIPT=newDefaultRedisScript<>();SECKILL_SCRIPT.setLocation(newClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}//线程池privatestaticfinalExecutorServiceSECKILL_ORDER_EXECUTOR=Executors.newSingleThreadExecutor();//在类初始化之后就执行init方法//init方法会去执行创建订单线程VoucherOrderHandler(新的线程,通过实现Runnable接口创建)@PostConstructprivatevoidinit(){SECKILL_ORDER_EXECUTOR.submit(newVoucherOrderHandler());}//VoucherOrderHandler为线程任务//该线程会去读消息队列中的订单信息,然后再去调用创建订单方法,完成异步下单privateclassVoucherOrderHandlerimplementsRunnable{privatefinalString queueName ="stream.orders";@Overridepublicvoidrun(){while(true){try{// 0.初始化streaminitStream();// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1","c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName,ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if(list ==null|| list.isEmpty()){// 如果为null,说明没有消息,继续下一次循环continue;}// 解析数据MapRecord<String,Object,Object> record = list.get(0);Map<Object,Object> value = record.getValue();VoucherOrder voucherOrder =BeanUtil.fillBeanWithMap(value,newVoucherOrder(),true);// 3.创建订单handleVoucherOrder(voucherOrder);// 4.确认消息 XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1", record.getId());}catch(Exception e){
                    log.error("处理订单异常", e);handlePendingList();}}}publicvoidinitStream(){Boolean exists = stringRedisTemplate.hasKey(queueName);if(BooleanUtil.isFalse(exists)){
                log.info("stream不存在,开始创建stream");// 不存在,需要创建
                stringRedisTemplate.opsForStream().createGroup(queueName,ReadOffset.latest(),"g1");
                log.info("stream和group创建完毕");return;}// stream存在,判断group是否存在StreamInfo.XInfoGroups groups = stringRedisTemplate.opsForStream().groups(queueName);if(groups.isEmpty()){
                log.info("group不存在,开始创建group");// group不存在,创建group
                stringRedisTemplate.opsForStream().createGroup(queueName,ReadOffset.latest(),"g1");
                log.info("group创建完毕");}}privatevoidhandlePendingList(){while(true){try{// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1","c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName,ReadOffset.from("0")));// 2.判断订单信息是否为空if(list ==null|| list.isEmpty()){// 如果为null,说明没有消息,继续下一次循环break;}// 解析数据MapRecord<String,Object,Object> record = list.get(0);Map<Object,Object> value = record.getValue();VoucherOrder voucherOrder =BeanUtil.fillBeanWithMap(value,newVoucherOrder(),true);// 3.创建订单handleVoucherOrder(voucherOrder);// 4.确认消息 XACK stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName,"g1", record.getId());}catch(Exception e){
                    log.error("处理订单异常", e);}}}}/**
     * 异步创建订单
     */privatevoidhandleVoucherOrder(VoucherOrder voucherOrder){//这里创建订单的线程不是主线程,所以不能从userHolder里获取用户,只能从订单对象中获取用户idLong userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:"+ userId);/**
         * tryLock参数说明:
         * long waitTime 超时等待时间  默认是-1,也就是不等待,获取不到就直接返回false
         * long leaseTime 超时释放时间 默认是30s,如果该锁超过30s会自动释放
         */boolean isLock = lock.tryLock();if(!isLock){//兜底方案,其实不用再去获取锁,因为在lua脚本中已经判断过一人一单
            log.error("不允许重复下单!");return;}try{//获取事务代理对象//这里不能通过AopContext.currentProxy()去获取代理对象,因为创建优惠卷订单(createVoucherOrder)是在主线程执行的//而当前方法是新的线程执行的代码,我们必须用主线程才能防止创建优惠卷订单(createVoucherOrder)事务失效
            proxy.createVoucherOrder(voucherOrder);}finally{
            lock.unlock();}}//通过主线程获取代理对象privateIVoucherOrderService proxy;@OverridepublicResultseckillVoucher(Long voucherId){/**
         * 秒杀实现七:
         * 使用redis提供的Stream消息队列优化秒杀
         */Long userId =UserHolder.getUser().getId();//获取订单idlong orderId = redisIdWorker.nextId("order");//      1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),
                voucherId.toString(), userId.toString(),String.valueOf(orderId));//       2 判断结果是否为0int r = result.intValue();if(r !=0){//            2.1 不为0,代表没有购买资格returnResult.fail(r ==1?"库存不足":"不能重复下单");}

        proxy =(IVoucherOrderService)AopContext.currentProxy();//        返回订单idreturnResult.ok(orderId);}@TransactionalpublicvoidcreateVoucherOrder(VoucherOrder voucherOrder){Long userId = voucherOrder.getUserId();int count =query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();if(count >0){
            log.error("您已经购买过一次了!");return;}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update();if(!success){
            log.error("库存不足!");return;}save(voucherOrder);}}
lua脚本
------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by qiang.--- DateTime: 2023/1/9 14:41----- 1.参数列表--判断库存是否充足需要去redis中去查,所以需要知道优惠卷id-- 1.1优惠卷idlocal voucherId = ARGV[1]--判断一人一单需要知道用户id-- 1.2用户idlocal userId = ARGV[2]-- 1.3.订单idlocal orderId = ARGV[3]-- 2.数据库key-- 2.1库存keylocal stockKey ='seckill:stock:'.. voucherId
-- 2.2订单keylocal orderKey ='seckill:order:'.. voucherId

-- 3.脚本业务-- 3.1判断库存是否大于0 get stockif(tonumber(redis.call('get', stockKey))<=0)then-- 3.2.库存不足,返回1return1end-- 判断用户是否下单 SISMEMBER orderKey userId--127.0.0.1:6379> sadd setCollection1 1 2 3--(integer) 3--127.0.0.1:6379> SISMEMBER setCollection1 2--(integer) 1--127.0.0.1:6379> SISMEMBER setCollection1 0--(integer) 0if(redis.call('SISMEMBER', orderKey, userId)==1)then--存在,说明是重复下单return2end-- 3.4扣库存 incrby stockKey - 1
redis.call('incrby', stockKey,-1)-- 3.5下单
redis.call('sadd', orderKey, userId)-- 3.6发送消息到队列中,XADD stream.orders * k1 v1 k2 v2 ...-- *代表消息的id自动生成,orderId的key为id是因为订单实体类中订单id的字段属性为id
redis.call('xadd','stream.orders','*','userId', userId,'voucherId', voucherId,'id', orderId)return0

总结

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

总结

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次
标签: java spring boot redis

本文转载自: https://blog.csdn.net/qq_45714272/article/details/128632144
版权归原作者 小蔡coding 所有, 如有侵权,请联系我们删除。

“【Java项目推荐】值得写到简历上的项目--黑马点评”的评论:

还没有评论