0


SpringBoot 集成 RabbitMQ

概要

SpringBoot 集成 RabbitMQ

整体架构流程

1、安装并运行RabbitMQ服务器
2、创建 Spring Boot 项目并添加依赖
3、配置 RabbitMQ
4、生产者、消费者、队列、交换机和路由键

技术细节

Docker 安装 RabbitMQ

docker pull rabbitmq:3-management

运行RabbitMQ容器

docker run -d --name rabbitmq -p 5672:5672-p 15672:15672 rabbitmq:3-management

Spring Boot 项目添加依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>

配置 RabbitMQ

spring:
  rabbitmq:
    host: localhost
    port:5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual #将消息确认模式设置为手动

配置队列、交换机和路由键

@ConfigurationpublicclassRabbitMQConfig{/**
     * 定义主题交换机
     * 根据消息的路由键和绑定的路由模式进行消息路由。
     */@BeanpublicExchangetopicExchange(){returnnewTopicExchange("topic.exchange",true,false);}/**
     * 绑定主题交换机
     */@BeanpublicBindingqueueBindingTopic(){returnBindingBuilder.bind("your-queue").to(topicExchange()).with("topic.#");}/**
     * 定义扇形交换机
     * 会将收到的所有消息广播到与其绑定的所有队列。
     * 不管消息的路由键是什么,所有绑定的队列都会收到相同的消息
     */@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout.exchange");}/**
     * 绑定扇形交换机
     */@BeanpublicBindingqueueBindingFanout(){returnBindingBuilder.bind("your-queue").to(fanoutExchange());}/**
     * 定义直连交换机
     * 直连交换机 (Direct Exchange) 是一种 RabbitMQ 交换机类型,它根据消息的路由键路由消息。
     * 消息会附带特定的路由键发送到交换机。
     * 然后交换机尝试找到一个与其绑定的队列,并具有匹配的路由键。
     * 如果找到匹配的队列,则消息将被投递到该队列。
     */@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct.exchange");}/**
     * 绑定直连交换机
     */@BeanpublicBindingqueueBindingDirect(){returnBindingBuilder.bind("your-queue").to(directExchange()).with("routing.key");}/**
    * 优先级队列
    */@BeanpublicQueuepriorityQueue(){Map<String,Object> args =newHashMap<>();
        args.put("x-max-priority",10);returnnewQueue("priority.queue",true,false,false, args);}/**
    * 持久化队列
    */@BeanpublicQueuedurableQueue(){returnnewQueue("durable.queue",true);}/**
    * 消息 TTL(Time to Live)
    */@BeanpublicQueuettlQueue(){Map<String,Object> args =newHashMap<>();
        args.put("x-message-ttl",60000);// 60秒returnnewQueue("ttl.queue",true,false,false, args);}/**
     * 延时队列
     * 需要安装RabbitMQ的延时消息插件(RabbitMQ Delayed Message Plugin)
     */@BeanpublicQueuedelayQueue(){//延时队列的消息过期了,会自动触发消息的转发,//根据routingKey发送到指定的exchange中,exchange路由到死信队列Map<String,Object> args =newHashMap<>();//表示队列的最大长度为 1000 条消息。超过此限制的消息将会被丢弃。
        args.put("x-max-length",1000);
        args.put("x-dead-letter-exchange",topicExchange());// 死信路由Key
        args.put("x-dead-letter-routing-key","dlx.close");// 单位:毫秒,1分钟测试使用
        args.put("x-message-ttl",60000);returnnewQueue("delay.queue",true,false,false, args);}/**
     * 延时队列绑定交换机
     */@BeanpublicBindingdelayQueueBinding(){returnBindingBuilder.bind(delayQueue()).to(topicExchange()).with("delay.create");}/**
    * 死信队列
    */@BeanpublicQueuedlxQueue(){Map<String,Object> args =newHashMap<>();
        args.put("x-dead-letter-exchange","dlx.exchange");returnnewQueue("dlx.queue",true,false,false, args);}/**
     * 死信队列绑定交换机
     */@BeanpublicBindingdlxQueueBinding(){returnBindingBuilder.bind(dlxQueue()).to(directExchange()).with("dlx.close").noargs();;}}

生产者

@ServicepublicclassRabbitMQProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;@Value("${spring.rabbitmq.exchange}")privateString exchange;@Value("${spring.rabbitmq.routingkey}")privateString routingKey;@Value("${spring.rabbitmq.queue}")privateString queue;publicvoidsend(String message){
        rabbitTemplate.convertAndSend(queue, message, msg ->{
               msg.getMessageProperties()//当将消息的投递模式设置为 PERSISTENT 时,RabbitMQ 会将消息持久化存储到磁盘上。//即使 RabbitMQ 服务重启,消息也不会丢失。//相反,如果投递模式为 TRANSIENT (默认),则消息仅保存在内存中,RabbitMQ 服务重启后将会丢失。.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//RabbitMQ 将根据消息的优先级进行投递,优先级高的消息将优先被消费者消费。.setPriority(priority);return msg;});}}

消费者

@ServicepublicclassRabbitMQConsumer{@RabbitListener(queues ="${spring.rabbitmq.queue}")publicvoidreceivedMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){try{// 处理消息System.out.println("Received message: "+ message);// 手动确认消息
            channel.basicAck(deliveryTag,false);}catch(Exception e){// 处理异常,拒绝消息
            channel.basicNack(deliveryTag,false,true);//或者重新入队
            channel.basicReject(deliveryTag,true);}}}

小结

生产环境中,可以增加消费者并发数,提升消息处理能力。
可以将消息进行压缩,减少网络开销
配置合理的消息 TTL,避免消息积压。


本文转载自: https://blog.csdn.net/qq_35091185/article/details/140789157
版权归原作者 Aaron丶琦琦 所有, 如有侵权,请联系我们删除。

“SpringBoot 集成 RabbitMQ”的评论:

还没有评论