前言
在项目中经常有延迟业务处理的背景,此时可以借助于Rabbitmq的延迟队列进行实现,但Rabbitmq本身并不支持延迟队列,但可以通过安装插件的方式实现延迟队列
环境准备
- 首先确认目前项目使用的Rabbitmq的版本,这里博主的版本是3.9.15的。
- 访问 Rabbitmq的github网址,检索 delay 找到插件
rabbitmq-delayed-message-exchange
,如下图所示: - 找到延迟队列插件相应的版本并进行下载。 博主的Rabbitmq是3.9版本的,所以这里选择3.9版本即可如下图所示: 下载
rabbitmq_delayed_message_exchange-3.9.0.ez
安装延迟队列
上面我们已经成功下载了延迟队列插件,接下来我们需要将此插件上传到Linux系统服务器上,然后再将插件拷贝到Rabbitmq的容器中。
- 查看Rabbitmq容器进程
- 将Rabbitmq延迟队列插件拷贝到容器中
# c94f1e7dbfc0 代表容器Id,通过上面1的步骤即可查看dockercp rabbitmq_delayed_message_exchange-3.9.0.ez c94f1e7dbfc0:/opt/rabbitmq/plugins
- 进入容器并安装延迟队列插件
dockerexec-it c94f1e7dbfc0 /bin/bash
查看 /opt/rabbitmq/plugins,是否已经有了延迟队列插件。如下图所示:
安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4. 安装消息管理插件
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
4. 查看已经安装的Rabbitmq插件
5. 控制台查看验证延迟队列类型如下图:
延迟队列实战
SpringBoot项目中引入Rabbitmq的依赖项
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
新增一个交换机、一个队列然后绑定队列与交换机如下图所示:
@ConfigurationpublicclassRabbitmqConfig{@Bean(name ="updateAmount")publicTopicExchangelazyExchange(){Map<String,Object> pros =newHashMap<>();//设置交换机支持延迟消息推送
pros.put("x-delayed-type","direct");TopicExchange exchange =newTopicExchange("updateAmount",true,false,pros);// 这一行是重点,指定交换机类型
exchange.setDelayed(true);return exchange;}@BeanpublicQueuelazyQueue(){returnnewQueue("LAZY_QUEUE",true);}@BeanpublicBindinglazyBinding(){returnBindingBuilder.bind(lazyQueue()).to(lazyExchange()).with("");}}
新增发送延迟队列的工具类代码逻辑:
publicclassRabbitmqUtils{/**
* 发送延迟消息
* @param rabbitTemplate rabbitTemplate
* @param millisecond 延迟毫秒
* @param messageContent 发送字符串
* @param busiId 业务主键id
*/publicstaticvoidsendDelayMessage(RabbitTemplate rabbitTemplate,Integer millisecond,Object messageContent,Long busiId){CorrelationData correlationData =newCorrelationData(busiId.toString()+System.currentTimeMillis());
rabbitTemplate.convertAndSend("updateAmount","", messageContent,
message ->{
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(millisecond);return message;}, correlationData);}}
新增一个监听器代码消费延迟队列的消息:
@Component@Slf4j@RequiredArgsConstructor@RabbitListener(queues ="LAZY_QUEUE")publicclassRabbitmqListener{privatefinalObjectMapper objectMapper;@RabbitHandlerpublicvoidonCustomBookingMessage(@PayloadString message,Channel channel,@HeadersMap<String,Object> headers){Long deliveryTag =(Long) headers.get(AmqpHeaders.DELIVERY_TAG);try{Map map = objectMapper.readValue(message,Map.class);
log.info("接收消息成功,消息内容:"+map);ack(channel,deliveryTag,"");}catch(JsonProcessingException e){nack(channel,deliveryTag,"");}}/**
* 确认消息成功
*/publicvoidack(Channel channel,long deliveryTag,String info){try{
channel.basicAck(deliveryTag,false);}catch(IOException e){
log.error(info +":消息应答出错", e);}}/**
* 确认消息失败
*/publicvoidnack(Channel channel,long deliveryTag,String info){try{
channel.basicNack(deliveryTag,false,true);}catch(IOException e){
log.error(info +":消息拒绝出错", e);}}}
新增一个生产者发送消息的代码:
@RequestMapping("/rabbit")@RequiredArgsConstructor@RestController@Slf4jpublicclassRabbitmqController{privatefinalRabbitTemplate rabbitTemplate;privatefinalObjectMapper objectMapper;@GetMapping("/sendMessage")publicvoidsendDeLayMessage()throwsJsonProcessingException{Map<String,Object> map =newHashMap<>();
map.put("username","zhangsan");
map.put("tranferMoney",newBigDecimal("1200.23"));
map.put("time",LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));// 发送消息RabbitmqUtils.sendDelayMessage(rabbitTemplate,7000,objectMapper.writeValueAsString(map),System.currentTimeMillis());
log.info("发送延迟队列消息成功:消息内容:"+map);};}
在浏览器访问:http://127.0.0.1:8080/rabbit/sendMessage 后可以看到控制台输出如下日志:
因为我们设定延迟时间为7s,可以清晰看到日志打印的时间刚好相差了7s,证明我们延迟队列发送消息成功。
版权归原作者 Galen-gao 所有, 如有侵权,请联系我们删除。