概要
SpringBoot 集成 RabbitMQ
整体架构流程
1、安装并运行RabbitMQ服务器
2、创建 Spring Boot 项目并添加依赖
3、配置 RabbitMQ
4、生产者、消费者、队列、交换机和路由键
技术细节
Docker 安装 RabbitMQ
docker pull rabbitmq:3-management
运行RabbitMQ容器
docker run -d --name rabbitmq -p 5672:5672-p 15672:15672 rabbitmq:3-management
Spring Boot 项目添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
配置 RabbitMQ
spring:
rabbitmq:
host: localhost
port:5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual #将消息确认模式设置为手动
配置队列、交换机和路由键
@ConfigurationpublicclassRabbitMQConfig{/**
* 定义主题交换机
* 根据消息的路由键和绑定的路由模式进行消息路由。
*/@BeanpublicExchangetopicExchange(){returnnewTopicExchange("topic.exchange",true,false);}/**
* 绑定主题交换机
*/@BeanpublicBindingqueueBindingTopic(){returnBindingBuilder.bind("your-queue").to(topicExchange()).with("topic.#");}/**
* 定义扇形交换机
* 会将收到的所有消息广播到与其绑定的所有队列。
* 不管消息的路由键是什么,所有绑定的队列都会收到相同的消息
*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout.exchange");}/**
* 绑定扇形交换机
*/@BeanpublicBindingqueueBindingFanout(){returnBindingBuilder.bind("your-queue").to(fanoutExchange());}/**
* 定义直连交换机
* 直连交换机 (Direct Exchange) 是一种 RabbitMQ 交换机类型,它根据消息的路由键路由消息。
* 消息会附带特定的路由键发送到交换机。
* 然后交换机尝试找到一个与其绑定的队列,并具有匹配的路由键。
* 如果找到匹配的队列,则消息将被投递到该队列。
*/@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct.exchange");}/**
* 绑定直连交换机
*/@BeanpublicBindingqueueBindingDirect(){returnBindingBuilder.bind("your-queue").to(directExchange()).with("routing.key");}/**
* 优先级队列
*/@BeanpublicQueuepriorityQueue(){Map<String,Object> args =newHashMap<>();
args.put("x-max-priority",10);returnnewQueue("priority.queue",true,false,false, args);}/**
* 持久化队列
*/@BeanpublicQueuedurableQueue(){returnnewQueue("durable.queue",true);}/**
* 消息 TTL(Time to Live)
*/@BeanpublicQueuettlQueue(){Map<String,Object> args =newHashMap<>();
args.put("x-message-ttl",60000);// 60秒returnnewQueue("ttl.queue",true,false,false, args);}/**
* 延时队列
* 需要安装RabbitMQ的延时消息插件(RabbitMQ Delayed Message Plugin)
*/@BeanpublicQueuedelayQueue(){//延时队列的消息过期了,会自动触发消息的转发,//根据routingKey发送到指定的exchange中,exchange路由到死信队列Map<String,Object> args =newHashMap<>();//表示队列的最大长度为 1000 条消息。超过此限制的消息将会被丢弃。
args.put("x-max-length",1000);
args.put("x-dead-letter-exchange",topicExchange());// 死信路由Key
args.put("x-dead-letter-routing-key","dlx.close");// 单位:毫秒,1分钟测试使用
args.put("x-message-ttl",60000);returnnewQueue("delay.queue",true,false,false, args);}/**
* 延时队列绑定交换机
*/@BeanpublicBindingdelayQueueBinding(){returnBindingBuilder.bind(delayQueue()).to(topicExchange()).with("delay.create");}/**
* 死信队列
*/@BeanpublicQueuedlxQueue(){Map<String,Object> args =newHashMap<>();
args.put("x-dead-letter-exchange","dlx.exchange");returnnewQueue("dlx.queue",true,false,false, args);}/**
* 死信队列绑定交换机
*/@BeanpublicBindingdlxQueueBinding(){returnBindingBuilder.bind(dlxQueue()).to(directExchange()).with("dlx.close").noargs();;}}
生产者
@ServicepublicclassRabbitMQProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;@Value("${spring.rabbitmq.exchange}")privateString exchange;@Value("${spring.rabbitmq.routingkey}")privateString routingKey;@Value("${spring.rabbitmq.queue}")privateString queue;publicvoidsend(String message){
rabbitTemplate.convertAndSend(queue, message, msg ->{
msg.getMessageProperties()//当将消息的投递模式设置为 PERSISTENT 时,RabbitMQ 会将消息持久化存储到磁盘上。//即使 RabbitMQ 服务重启,消息也不会丢失。//相反,如果投递模式为 TRANSIENT (默认),则消息仅保存在内存中,RabbitMQ 服务重启后将会丢失。.setDeliveryMode(MessageDeliveryMode.PERSISTENT)//RabbitMQ 将根据消息的优先级进行投递,优先级高的消息将优先被消费者消费。.setPriority(priority);return msg;});}}
消费者
@ServicepublicclassRabbitMQConsumer{@RabbitListener(queues ="${spring.rabbitmq.queue}")publicvoidreceivedMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){try{// 处理消息System.out.println("Received message: "+ message);// 手动确认消息
channel.basicAck(deliveryTag,false);}catch(Exception e){// 处理异常,拒绝消息
channel.basicNack(deliveryTag,false,true);//或者重新入队
channel.basicReject(deliveryTag,true);}}}
小结
生产环境中,可以增加消费者并发数,提升消息处理能力。
可以将消息进行压缩,减少网络开销
配置合理的消息 TTL,避免消息积压。
版权归原作者 Aaron丶琦琦 所有, 如有侵权,请联系我们删除。