0


【黑马点评】 使用RabbitMQ实现消息队列——2.使用RabbitMQ监听秒杀下单

2 使用RabbitMQ实现消息队列

2.1 修改\hm-dianping\pom.xmlpom.xml文件

添加RabbitMQ的环境

<!-- RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 修改Resource下的application.yaml文件

添加RabbitMQ的配置信息

spring:rabbitmq:host: 127.0.0.1 # IP地址port:5672# 端口号username: hmdianping # 用户名password:123456# 密码listener:simple:concurrency:1max-concurrency:1acknowledge-mode: manual
        prefetch:1

主启动类标注@EnableRabbit开启消息队列的监听功能

在这里插入图片描述

2.3 配置RabbitMQ,创建交换机和消息队列,将二者绑定

创建direct类型的交换机以及一个名为seckill.order.queue的消息队列,然后将二者绑定.之后发往该交换机且路由键为seckill.order的消息都会转发seckill.order.queue

具体而言,在hm-dianping.src.main.java.com.hmdp.config下新建RabbitMQConfig文件

文件内容如下

packagecom.hmdp.config;importcom.hmdp.entity.VoucherOrder;importcom.hmdp.service.IVoucherOrderService;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.Resource;importjava.io.IOException;@ConfigurationpublicclassRabbitMQConfig{@ResourceIVoucherOrderService voucherOrderService;@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="direct.seckill.queue"),
            key ="direct.seckill",
            exchange =@Exchange(name ="hmdianping.direct", type =ExchangeTypes.DIRECT)))publicvoidrecieveMessage(Object message){System.out.println("监听到了"+message);}@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}

2.3.1 测试监听消息

在Test中添加发送消息的方法,指定交换机

hmdianping.direct

为和路由键

direct.seckill

@ResourceRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendMessage(){
        rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill","测试发送消息");}

先运行Test方法

在这里插入图片描述

之后运行启动类HmDianPingApplication

发现监听到了,说明连接成功。

在这里插入图片描述

2.3.2 修改秒杀下单业务(VoucherOrderServiceImpl中的seckillVoucher方法)

  • 注入RabbitTemplate类
  • 在认定有抢购资格后,直接向seckill.direct交换机发送消息,内容包含voucherId、userId、orderId
@ResourceRabbitTemplate rabbitTemplate;@OverridepublicResultseckillVoucher(Long voucherId){//1.执行lua脚本,判断当前用户的购买资格Long userId =UserHolder.getUser().getId();Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,Collections.emptyList(),
                voucherId.toString(), userId.toString());if(result !=0){//2.不为0说明没有购买资格returnResult.fail(result==1?"库存不足":"不能重复下单");}//3.走到这一步说明有购买资格,将订单信息存到消息队列VoucherOrder voucherOrder =newVoucherOrder();long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        voucherOrder.setVoucherId(voucherId);//存入消息队列等待异步消费
        rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);returnResult.ok(orderId);}

此时VoucherOrderServiceImpl文件内容如下

packagecom.hmdp.service.impl;importcom.hmdp.dto.Result;importcom.hmdp.entity.VoucherOrder;importcom.hmdp.mapper.VoucherOrderMapper;importcom.hmdp.service.ISeckillVoucherService;importcom.hmdp.service.IVoucherOrderService;importcom.baomidou.mybatisplus.extension.service.impl.ServiceImpl;importcom.hmdp.utils.RedisIdWorker;importcom.hmdp.utils.UserHolder;importorg.redisson.api.RLock;importorg.redisson.api.RedissonClient;importorg.springframework.aop.framework.AopContext;importorg.springframework.core.io.ClassPathResource;importorg.springframework.data.redis.core.StringRedisTemplate;importorg.springframework.data.redis.core.script.DefaultRedisScript;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;importjava.util.Collections;importjava.util.concurrent.*;importorg.springframework.amqp.rabbit.core.RabbitTemplate;/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author 虎哥
 * @since 2021-12-22
 */@ServicepublicclassVoucherOrderServiceImplextendsServiceImpl<VoucherOrderMapper,VoucherOrder>implementsIVoucherOrderService{@ResourceprivateISeckillVoucherService seckillVoucherService;@ResourceprivateIVoucherOrderService iVoucherOrderService;@ResourceprivateRedisIdWorker redisIdWorker;@ResourceprivateStringRedisTemplate stringRedisTemplate;@ResourceprivateRedissonClient redissonClient;privatestaticfinalDefaultRedisScript<Long> SECKILL_SCRIPT;static{
        SECKILL_SCRIPT =newDefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(newClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);}privateBlockingQueue<VoucherOrder> orderTasks =newArrayBlockingQueue<>(1024*1024);//异步处理线程池privatestaticfinalExecutorService SECKILL_ORDER_EXECUTOR =Executors.newSingleThreadExecutor();@TransactionalpublicvoidhandleVoucherOrder(VoucherOrder voucherOrder){//1.所有信息从当前消息实体中拿Long voucherId = voucherOrder.getVoucherId();//2.扣减库存boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherId)//======判断当前库存是否大于0就可以决定是否能抢池子中的券了.gt("stock",0).update();//3.创建订单if(success)save(voucherOrder);}@ResourceRabbitTemplate rabbitTemplate;@OverridepublicResultseckillVoucher(Long voucherId){//1.执行lua脚本,判断当前用户的购买资格Long userId =UserHolder.getUser().getId();Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,Collections.emptyList(),
                voucherId.toString(), userId.toString());if(result !=0){//2.不为0说明没有购买资格returnResult.fail(result==1?"库存不足":"不能重复下单");}//3.走到这一步说明有购买资格,将订单信息存到消息队列VoucherOrder voucherOrder =newVoucherOrder();long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        voucherOrder.setVoucherId(voucherId);//存入消息队列等待异步消费
        rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);returnResult.ok(orderId);}@TransactionalpublicvoidcreateVoucherOrder(VoucherOrder voucherOrder){// 5.一人一单逻辑// 5.1.用户idLong userId = voucherOrder.getId();int count =query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();// 5.2.判断是否存在if(count >0){// 用户已经购买过了
            log.error("用户已经购买过一次!");return;}//6.扣减库存boolean success = seckillVoucherService.update().setSql("stock= stock -1")// set stock = stock -1.eq("voucher_id", voucherOrder).gt("stock",0)// where id = ? and stock > 0.update();if(!success){//扣减库存
            log.error("库存不足!");return;}save(voucherOrder);}}

2.3.3 监听秒杀成功订单

  • 监听seckill.order.queue队列的信息并且创建订单到数据库,当创建完成时手动ack

RabbitMQConfig中的recieveMessage修改为

publicvoidrecieveMessage(Message message,Channel channel,VoucherOrder voucherOrder){try{
            voucherOrderService.handleVoucherOrder(voucherOrder);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){thrownewRuntimeException(e);}System.out.println("监听到了"+message);}

此时,文件内容为。

packagecom.hmdp.config;importcom.hmdp.entity.VoucherOrder;importcom.hmdp.service.IVoucherOrderService;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.Resource;importjava.io.IOException;@ConfigurationpublicclassRabbitMQConfig{@ResourceIVoucherOrderService voucherOrderService;@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="direct.seckill.queue"),
            key ="direct.seckill",
            exchange =@Exchange(name ="hmdianping.direct", type =ExchangeTypes.DIRECT)))/*    public void recieveMessage(Object message){
        System.out.println("监听到了"+message);
    }*/publicvoidrecieveMessage(Message message,Channel channel,VoucherOrder voucherOrder){try{
            voucherOrderService.handleVoucherOrder(voucherOrder);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){thrownewRuntimeException(e);}System.out.println("监听到了"+message);}@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}

2.3.4 测试

使用Apifox测试

在这里插入图片描述

成功

在这里插入图片描述

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/m0_52031708/article/details/142725792
版权归原作者 -$_$- 所有, 如有侵权,请联系我们删除。

“【黑马点评】 使用RabbitMQ实现消息队列——2.使用RabbitMQ监听秒杀下单”的评论:

还没有评论