0


分布式 Redis & RabbitMQ 终极秒杀

♨️本篇文章记录的为RabbitMQ知识中

企业级项目

中秒杀相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛

💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习

上期我们使用阻塞队列分布式锁Redission分布式锁对业务功能进行优化,解决了在分布式环境下的秒杀安全问题,但是呢阻塞队列的确不够优雅, 而且还存在很多问题, 比如说这时候突然停电了,我们队列里的消息没来得及被消费就消失了.这就要出大问题.

现在我们要用一种更优雅的方式实现异步下单, 今天我们的主角就是

RabbitMQ

.

文章目录

🐇思路分析

  1. 我们先把MYSQL数据库中的订单数量在redis中保存一份.
  2. 在秒杀的业务中我们还是先执行Lua脚本, 判断我们是否具有购买资格, 如果没有购买资格我们直接返回结果;
  3. 如果有有购买资格,Lua脚本会直接进行redis中的订单扣减操作,进行秒杀
  4. 同时在redis的扣减操作完成后, 把下单信息保存到阻塞队列进行异步执行, 实现MYSQL数据库中订单扣减的同步操作,完美.

在这里插入图片描述

🐇代码操作

思路听起来很简单,下面我们来进行具体操作.

先引入RabbitMQ的核心依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在yml文件中进行相关配置

spring:
  rabbitmq:
    host: localhost
    port:5672
    virtual-host:/
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns:true
    template:
      mandatory:true
      retry:
        #发布重试,默认false
        enabled:true
        #重试时间 默认1000ms
        initial-interval:1000
        #重试最大次数 最大3
        max-attempts:3
        #重试最大间隔时间
        max-interval:10000
        #重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
        multiplier:1
    listener:
      # 默认配置是simple
      type: simple
      simple:
        # 手动ack Acknowledge mode of container. auto none
        acknowledge-mode: manual
        #消费者调用程序线程的最小数量
        concurrency:10
        #消费者最大数量
        max-concurrency:10
        #限制消费者每次只处理一条信息,处理完在继续下一条
        prefetch:1
        #启动时是否默认启动容器
        auto-startup:true
        #被拒绝时重新进入队列
        default-requeue-rejected:true

配置RabbitMQConfig文件

@Slf4j@ConfigurationpublicclassRabbitMQConfig{@Bean("RabbitTemplate")publicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);//设置Json转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());

        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
                log.info("ConfirmCallback:     "+"相关数据:"+correlationData);
                log.info("ConfirmCallback:     "+"确认情况:"+ack);
                log.info("ConfirmCallback:     "+"原因:"+cause);}});

        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
                log.info("return 执行了....");
                log.info("ReturnCallback:     "+"消息:"+returnedMessage.getMessage().toString());
                log.info("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());
                log.info("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());
                log.info("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());
                log.info("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());}});return rabbitTemplate;}/**
     * Json转换器
     */@BeanpublicJackson2JsonMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}//交换机名称publicstaticfinalStringSECKILL_EXCHANGE="seckill_exchange";//队列名称publicstaticfinalStringORDER_QUEUE="order_queue";@Bean("ORDER_QUEUE")QueueconfirmQueue(){returnQueueBuilder.durable(ORDER_QUEUE).build();}/**
     * 创建一个交换机
     * @return
     */@Bean("SECKILL_EXCHANGE")ExchangeconfirmExchange(){returnExchangeBuilder.topicExchange(SECKILL_EXCHANGE).durable(true).build();}@BeanBindingconfirmExchange(@Qualifier("SECKILL_EXCHANGE")Exchange exchange,@Qualifier("ORDER_QUEUE")Queue queue){//bind(queue) 绑定队列 to(exchange) 绑定交换机 with("") routingKey这里绑定的是空白可以按照自己的要求绑定  noargs() 没有参数returnBindingBuilder.bind(queue).to(exchange).with("order.#").noargs();}}

补充 : RabbitMQ的Json转换器尤为重要, 因为rabbitMQ中发送和接收的都是字符串/字节数组类型的消息, 所以我们要把java对象

转成json串

进行发送.

Lua脚本

--1.参数列表
--1.1.优惠券id
local voucherId =ARGV[1]--1.2.用户id
local userId =ARGV[2]--2.数据key
--2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

--3.脚本业务
--3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey))<=0) then
    --3.2.库存不足,返回1return1
end
--3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId)==1) then
    --3.3.存在,说明是重复下单,返回2return2
end
--3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey,-1)--3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)return0

下面是秒杀业务代码

@ServicepublicclassVoucherOrderServiceImplextendsServiceImpl<VoucherOrderMapper,VoucherOrder>implementsIVoucherOrderService{@ResourceprivateISeckillVoucherService seckillVoucherService;@ResourceprivateRedisIdWorker redisIdWorker;@ResourceprivateStringRedisTemplate stringRedisTemplate;@ResourceprivateRedissonClient redissonClient;@Resource(description ="RabbitTemplate")privateRabbitTemplate rabbitTemplate;privatestaticfinalDefaultRedisScript<Long>SECKILL_SCRIPT;static{SECKILL_SCRIPT=newDefaultRedisScript<>();SECKILL_SCRIPT.setLocation(newClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}privateIVoucherOrderService proxy;publicvoidhandleVoucherOrder(VoucherOrder voucherOrder){//1.获取用户Long userId = voucherOrder.getUserId();// 2.创建锁对象RLock redisLock = redissonClient.getLock("lock:order:"+ userId);// 3.尝试获取锁boolean isLock = redisLock.tryLock();// 4.判断是否获得锁成功if(!isLock){// 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");return;}try{//注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效createVoucherOrder(voucherOrder);}finally{// 释放锁
            redisLock.unlock();}}@OverridepublicResultseckillVoucher(Long voucherId){//获取用户Long userId =UserHolder.getUser().getId();// 1.执行lua脚本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),
                voucherId.toString(), userId.toString());int r = result.intValue();// 2.判断结果是否为0if(r !=0){// 2.1.不为0 ,代表没有购买资格returnResult.fail(r ==1?"库存不足":"不能重复下单");}// 2.2.为0 ,有购买资格,把下单信息保存到阻塞队列VoucherOrder voucherOrder =newVoucherOrder();// 2.3.订单idlong orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);// 2.4.用户id
        voucherOrder.setUserId(userId);// 2.5.代金券id
        voucherOrder.setVoucherId(voucherId);// 2.6.放入mq
        rabbitTemplate.convertAndSend("seckill_exchange","order.voucher", voucherOrder);// 3.获取代理对象
        proxy =(IVoucherOrderService)AopContext.currentProxy();// 4.返回订单idreturnResult.ok(orderId);}@Transactional@OverridepublicvoidcreateVoucherOrder(VoucherOrder voucherOrder){Long userId = voucherOrder.getUserId();// 5.1.查询订单int count =Math.toIntExact(query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count());// 5.2.判断是否存在if(count >0){// 用户已经购买过了
            log.error("不允许重复下单!");return;}// 6.扣减库存boolean success = seckillVoucherService.update().setSql("stock = stock - 1")// set stock = stock - 1.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0)// where id = ? and stock > 0.update();if(!success){// 扣减失败
            log.error("库存不足!");return;}// 7.创建订单save(voucherOrder);}}

总得有个消费者来监听我们的队列

消费者代码

@Slf4j@Component@RabbitListener(queues ="order_queue")publicclassVoucherOrderListenerimplementsChannelAwareMessageListener{@AutowiredVoucherOrderServiceImpl voucherOrderService;@RabbitHandler(isDefault=true)@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//1.接收转换消息ObjectMapper mapper =newObjectMapper();VoucherOrder voucherOrder = mapper.readValue(message.getBody(),VoucherOrder.class);//2. 处理业务逻辑try{// 2.创建订单

                voucherOrderService.handleVoucherOrder(voucherOrder);System.out.println("订单已执行");}catch(Exception e){
                log.error("处理订单异常", e);}//3. 手动签收
            channel.basicAck(deliveryTag,true);}catch(Exception e){//4.拒绝签收/*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);}}}

代码部分就这么些, 思路并不算复杂

在这里插入图片描述

🐇展示结果

在这里插入图片描述

这里显示抢购成功了

我们再看看redis和mysql中情况如何

在这里插入图片描述

在这里插入图片描述
显而易见, 表中对应的订单数量也都扣减了, 完美.

🐇补充

RabbitMQ中JSON格式转换为实体对象

//将JSON格式数据转换为实体对象ObjectMapper mapper =newObjectMapper();UserInfo userInfo = mapper.readValue(message.getBody(),VoucherOrder.class);

RabbitMQ中JSON格式转换为Map对象

@Testpublicvoidsender()throwsAmqpException{//创建用户信息MapMap<String,Object> userMap =newHashMap<>();
        userMap.put("userId","1");
        userMap.put("userName","阿千弟的博客");
        userMap.put("blogUrl","https://blog.csdn.net/qq_51033936");
        userMap.put("userRemark","您好,欢迎访问 阿千弟的博客");/**
         * 发送消息,参数说明:
         * String exchange:交换器名称。
         * String routingKey:路由键。
         * Object object:发送内容。
         */
        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME,RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
//将JSON格式数据转换为Map对象@RabbitListener(queues ="order_queue")publicclassVoucherOrderListenerimplementsChannelAwareMessageListener{@AutowiredVoucherOrderServiceImpl voucherOrderService;@RabbitHandler(isDefault=true)@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{long deliveryTag = message.getMessageProperties().getDeliveryTag();//将JSON格式数据转换为Map对象ObjectMapper mapper =newObjectMapper();JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class,String.class,Object.class);Map<String,Object> resultMap = mapper.readValue(message.getBody(),javaType);System.out.println("接收者收到Map格式消息:");System.out.println("用户编号:"+ resultMap.get("userId"));System.out.println("用户名称:"+ resultMap.get("userName"));System.out.println("博客地址:"+ resultMap.get("blogUrl"));System.out.println("博客信息:"+ resultMap.get("userRemark"));//确认消息
            channel.basicAck(deliveryTag,true)}catch(Exception e){//4.拒绝签收/*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);}}}

注意!!!如果@RabbitListener加在类上面,需要有一个默认的处理方法@RabbitHandler(isDefault=true),默认是false。不设置一个true,消费mq消息的时候会出现“Listener method ‘no match’ threw exception”异常。
在这里插入图片描述

如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习


本文转载自: https://blog.csdn.net/qq_51033936/article/details/127703125
版权归原作者 阿千弟 所有, 如有侵权,请联系我们删除。

“分布式 Redis & RabbitMQ 终极秒杀”的评论:

还没有评论