上一篇已经讲述了实现死信队列的rabbitMQ服务配置,可以点击: RabbitMQ的延迟队列实现(笔记一)
目录
搭建一个新的springboot项目
1.相关核心依赖如下
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--mq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- lombok 依赖 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
2.配置文件如下
server:port:8080spring:#MQ配置rabbitmq:host: ip
port:5673username: root
password: root+12345678
3.目录结构
模仿订单延迟支付过期操作
1.创建OrderMqConstant.java,设定常量,代码如下
packagecom.example.aboutrabbit.constant;/**
* @description 订单队列常量
* @author lxh
* @time 2024/2/7 17:05
*/publicinterfaceOrderMqConstant{/**
*交换机
*/String exchange ="order-event-exchange";/**
* 队列
*/String orderQueue ="order.delay.queue";/**
* 路由
*/String orderDelayRouting ="order.delay.routing";}
2.创建OrderDelayConfig.java,配置绑定
packagecom.example.aboutrabbit.config;importcom.example.aboutrabbit.constant.OrderMqConstant;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.CustomExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/**
* @author lxh
* @description 配置绑定
* @time 2024/2/7 17:15
**/@ConfigurationpublicclassOrderDelayConfig{/**
* 延时队列交换机
* 注意这里的交换机类型:CustomExchange
*/@BeanpublicCustomExchangemaliceDelayExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");// 属性参数 交换机名称 交换机类型 是否持久化 是否自动删除 配置参数returnnewCustomExchange(OrderMqConstant.exchange,"x-delayed-message",true,false, args);}/**
* 延时队列
*/@BeanpublicQueuemaliceDelayQueue(){// 属性参数 队列名称 是否持久化returnnewQueue(OrderMqConstant.orderQueue,true);}/**
* 给延时队列绑定交换机
*/@BeanpublicBindingmaliceDelayBinding(){returnBindingBuilder.bind(maliceDelayQueue()).to(maliceDelayExchange()).with(OrderMqConstant.orderDelayRouting).noargs();}}
3、创建 OrderMQReceiver.java监听过期的消息
packagecom.example.aboutrabbit.config;importcom.example.aboutrabbit.constant.OrderMqConstant;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.text.SimpleDateFormat;importjava.util.Date;/**
* @author lxh
* @description 接收过期订单
* @time 2024/2/7 17:21
**/@Component@Slf4jpublicclassOrderMQReceiver{@RabbitListener(queues =OrderMqConstant.orderQueue)publicvoidonDeadMessage(String infoId){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("收到头框过期时间:{},消息是:{}", sdf.format(newDate()), infoId);}}
4.分别创建MQService.java和MQServiceImpl.java,处理消息发送
packagecom.example.aboutrabbit.service;/**
* @description MQ发消息服务
* @author lxh
* @time 2024/2/7 17:26
*/publicinterfaceMQService{/**
* 发送或加队列
* @param orderId 订单主键
* @param time 毫秒
*/voidsendOrderAddInfo(Long orderId,Integer time);}
packagecom.example.aboutrabbit.service.impl;importcom.example.aboutrabbit.constant.OrderMqConstant;importcom.example.aboutrabbit.service.MQService;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.text.SimpleDateFormat;importjava.util.Date;/**
* @author lxh
* @description MQ发消息服务实现
* @time 2024/2/7 17:26
**/@Slf4j@ServicepublicclassMQServiceImplimplementsMQService{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* 发送或加队列
* @param orderId 订单主键
* @param time 毫秒
*/@OverridepublicvoidsendOrderAddInfo(Long orderId,Integer time){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("过期队列添加|添加时间:{},内容是:{},过期毫秒数:{}",sdf.format(newDate()),orderId, time);
rabbitTemplate.convertAndSend(OrderMqConstant.exchange,OrderMqConstant.orderDelayRouting,
orderId,
message ->{
message.getMessageProperties().setDelay(time);return message;});}}
5.创建控制层进行测试TestController.java
packagecom.example.aboutrabbit.controller;importcom.example.aboutrabbit.service.MQService;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;/**
* @author lxh
* @description 测试
* @time 2024/2/7 17:36
**/@RestController@RequestMapping("/test")publicclassTestController{@AutowiredprivateMQService mqService;@GetMapping("/send")publicStringlist(@RequestParamLong orderId,@RequestParamInteger fenTime){//默认Integer time = fenTime *60*1000;
mqService.sendOrderAddInfo(orderId, time);return"success";}}
6.全部结构展示
启动项目进行测试
1.示例:localhost:8080/test/send?orderId=1&fenTime=1
订单id为1的延迟一分钟过期,如下
2.查看日志
版权归原作者 Joe14103 所有, 如有侵权,请联系我们删除。