0


SpringBoot 整合RabbitMQ 之延迟队列实验

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列

在这里插入图片描述


文章目录

前言

实际业务中,例如秒杀系统,秒杀商品成功会有截止时间,这时需要用到RabbitMQ延迟服务。

1、RabbitMQ延迟队列

1.1、方式1:RabbitMQ通过死信机制来实现延迟队列的功能

  • TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间
  • Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信
  • Dead Letter Exchanges(DLX),即死信交换机
  • Dead Letter Routing Key (DLK),死信路由键
/***********************延迟队列*************************///创建立即消费队列@BeanpublicQueueimmediateQueue(){returnnewQueue("immediateQueue");}//创建立即消费交换机@BeanpublicDirectExchangeimmediateExchange(){returnnewDirectExchange("immediateExchange");}@BeanpublicBindingbindingImmediate(@Qualifier("immediateQueue")Queue queue,@Qualifier("immediateExchange")DirectExchange directExchange){returnBindingBuilder.bind(queue).to(directExchange).with("immediateRoutingKey");}//创建延迟队列@BeanpublicQueuedelayQueue(){Map<String,Object> params =newHashMap<>();//死信队列转发的死信转发到立即处理信息的交换机
    params.put("x-dead-letter-exchange","immediateExchange");//死信转化携带的routing-key
    params.put("x-dead-letter-routing-key","immediateRoutingKey");//设置消息过期时间,单位:毫秒
    params.put("x-message-ttl",60*1000);returnnewQueue("delayQueue",true,false,false,params);}@BeanpublicDirectExchangedelayExchange(){returnnewDirectExchange("delayExchange");}@BeanpublicBindingbindingDelay(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")DirectExchange directExchange){returnBindingBuilder.bind(queue).to(directExchange).with("delayRoutingKey");}
@TestpublicvoidsendDelay(){this.rabbitTemplate.convertAndSend("delayExchange","delayRoutingKey","Hello world topic");}

1.2、方式二:安装延迟队列插件

1.2.1、安装延迟队列插件:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

/**************延迟队列一个单一queue*******************/@BeanpublicQueuedelayNewQueue(){returnnewQueue("delayNewQueue");}@BeanpublicCustomExchangedelayNewExchange(){Map<String,Object> args =newHashMap<>();// 设置类型,可以为fanout、direct、topic
    args.put("x-delayed-type","direct");returnnewCustomExchange("delayNewExchange","x-delayed-message",true,false,args);}@BeanpublicBindingbindingNewDelay(@Qualifier("delayNewQueue")Queue queue,@Qualifier("delayNewExchange")CustomExchange customExchange){returnBindingBuilder.bind(queue).to(customExchange).with("delayNewRoutingKey").noargs();}
@TestpublicvoidsendDelay(){//生产端写完了UserInfo userInfo =newUserInfo();
    userInfo.setPassword("13432432");
    userInfo.setUserAccount("kelvin");this.rabbitTemplate.convertAndSend("delayNewExchange","delayNewRoutingKey", userInfo
            , a ->{//单位毫秒
                a.getMessageProperties().setDelay(30000);return a;});}

2、消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。

生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。
消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。

2.1、生产确认

@BeanpublicRabbitTemplategetTemplate(ConnectionFactory connectionFactory){RabbitTemplate template =newRabbitTemplate(connectionFactory);//消息发送到交换器Exchange后触发回调
    template.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){//  可以进行消息入库操作
            log.info("消息唯一标识 correlationData = {}", correlationData);
            log.info("确认结果 ack = {}", ack);
            log.info("失败原因 cause = {}", cause);}});// 配置这个,下面的ReturnCallback 才会起作用
    template.setMandatory(true);// 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
    template.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){//  可以进行消息入库操作
            log.info("消息主体 message = {}", returnedMessage.getMessage());
            log.info("回复码 replyCode = {}", returnedMessage.getReplyCode());
            log.info("回复描述 replyText = {}", returnedMessage.getReplyText());
            log.info("交换机名字 exchange = {}", returnedMessage.getExchange());
            log.info("路由键 routingKey = {}", returnedMessage.getRoutingKey());}});return template;}
spring:cloud:nacos:discovery:server-addr: localhost:8848application:name: drp-user-service  #微服务名称datasource:username: root
    password: root
    url: jdbc:mysql://127.0.0.1:3306/drp
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:host: 127.0.0.1
    port:5672username: root
    password: root
    virtual-host: root_vh
    # 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated
    # 确认消息已发送到队列publisher-returns:truelistener:simple:acknowledge-mode: manual # 开启消息消费手动确认retry:enabled:true

2.2、消费确认

@RabbitHandlerpublicvoidprocess(UserInfo data,Message message,Channel channel){
    log.info("收到directQueue队列信息:"+ data);long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//成功消费确认
        channel.basicAck(deliveryTag,true);
        log.info("消费成功确认完毕。。。。。");}catch(IOException e){
        log.error("确认消息时抛出异常 ", e);// 重新确认,成功确认消息try{Thread.sleep(50);
            channel.basicAck(deliveryTag,true);}catch(IOException|InterruptedException e1){
            log.error("确认消息时抛出异常 ", e);// 可以考虑入库}}catch(Exception e){
        log.error("业务处理失败", e);try{// 失败确认
            channel.basicNack(deliveryTag,false,false);}catch(IOException e1){
            log.error("消息失败确认失败", e1);}}}

本文转载自: https://blog.csdn.net/s445320/article/details/134280266
版权归原作者 青花锁 所有, 如有侵权,请联系我们删除。

“SpringBoot 整合RabbitMQ 之延迟队列实验”的评论:

还没有评论