0


Spring Boot 整合 RabbitMQ 详解

前言:

在消息中间件领域中 RabbitMQ 也是一种非常常见的消息中间件了,本篇简单分享一下 Spring Boot 项目集成 RabbitMQ 的过程。

RabbitMQ 系列文章传送门

RabbitMQ 的介绍及核心概念讲解

@RabbitListener 注解详解

Spring Boot 集成 RabbitMQ 可以分为三大步,如下:

  • 在 proerties 或者 yml 文件中添加 RabbitMQ 配置。
  • 项目 pom.xml 文件中引入 spring-boot-starter-amqp 依赖。
  • 注入 RabbitTemplate 开始使用 RabbitMQ ,其实这步以及算是使用了,不能算作集成了,但是集成了总归是要使用的,我把这里也算作一步了。

在 proerties 或者 yml 文件中添加 RabbitMQ 配置如下:

spring.rabbitmq.host= xxx.xxx.xxx
spring.rabbitmq.port= 5672
spring.rabbitmq.username= admin
spring.rabbitmq.password= admin
spring.rabbitmq.virtual-host = /study

项目 pom.xml 文件中引入 spring-boot-starter-amqp 依赖如下:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.5</version></dependency>

RabbitMQ 使用

前文我们在分享 RabbitMQ 核心概念的时候,我们知道了 RabbitMQ 有六种交换机类型,下面我们就针对六种交换机来分享 RabbitMQ 的使用。

Direct Exchange(直连交换机)

直连交换机是 RoutingKey 完全匹配模式,也就是我们常说的点对点模式,消息会传送给 RoutingKey 完全匹配的队列。

直连交换机 Direct Exchange 和队列 Queue 的路由配置代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitDirectConfig{//注入队列@BeanpublicQueuequeue(){returnnewQueue("direct-buget-queue");}//注入交换机@BeanpublicDirectExchangedirectExchange(){//durable:重启后是否有效 autodelete: 长期未使用是否删除掉returnnewDirectExchange("direct-buget-exchange",true,true);}//绑定队列和交换机@BeanpublicBindingbinding(){returnBindingBuilder.bind(queue()).to(directExchange()).with("direct-buget-exchange-routing-key");}}

直连交换机 Direct Exchange 消息生产代码如下:

//direct 模式消息发送publicvoidsendDirectMessage(String message){
    rabbitTemplate.convertAndSend("direct-buget-exchange","direct-buget-exchange-routing-key", message);}

直连交换机 Direct Exchange 消息消费代码如下:

//direct 直连模式消费端@RabbitListener(queues ="direct-buget-queue")publicvoiddirectConsumer(String message){System.out.println("direct 消息消费成功,message内容为:"+ message);}

直连交换机 Direct Exchange 消息测试(触发消息生产及消费)代码代码如下:

@GetMapping("/send-direct-buget-message")privateStringsendDirectBugetMessage(@RequestParamString message){
    myRabbitProducer.sendDirectMessage(message);return"OK";}

直连交换机 Direct Exchange 消息测试(触发消息生产及消费)结果如下:

direct 消息消费成功,message内容为:hello message

直连交换机,一对一模式,结果符合预期。

Topic Exchange(主题交换机)

主题交换机支持路由模糊匹配,可以使用星号和井号(#)作为通配符进行匹配,其中 ”*“ 可以代替一个单词,(#) 可以代替任意个单词。

主题交换机 Topic Exchange 和队列 Queue 的路由配置代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitTopicConfig{//注入队列@Bean("topicQueue")publicQueuequeue1(){returnnewQueue("topic-buget-queue");}//注入队列@Bean("topicQueue2")publicQueuequeue2(){returnnewQueue("topic-buget-queue2");}//注入交换机@BeanpublicTopicExchangetopicExchange(){//durable:重启后是否有效 autodelete: 长期未使用是否删除掉returnnewTopicExchange("topic-buget-exchange",true,true);}//绑定队列和交换机@Bean("topicBinding")publicBindingbinding(){returnBindingBuilder.bind(queue1()).to(topicExchange()).with("topic.buget.exchange.routing.key.*");}//绑定队列和交换机@Bean("topicBinding2")publicBindingbinding2(){returnBindingBuilder.bind(queue2()).to(topicExchange()).with("topic.buget.exchange.routing.key.#");}}

主题交换机 Topic Exchange 消息生产代码如下:

//topic 模式消息发送publicvoidsendTopicMessage(String message){
    rabbitTemplate.convertAndSend("topic-buget-exchange","topic.buget.exchange.routing.key.1", message);}//topic 模式消息发送publicvoidsendTopicMessage2(String message){
    rabbitTemplate.convertAndSend("topic-buget-exchange","topic.buget.exchange.routing.key.1.1", message);}

主题交换机 Topic Exchange 消息消费代码如下:

//topic 模式消费端@RabbitListener(queues ="topic-buget-queue")publicvoidtopicConsumer(String message){System.out.println("topic topic-buget-queue 消息消费成功,message内容为:"+ message);}//topic 模式消费端@RabbitListener(queues ="topic-buget-queue2")publicvoidtopicConsumer2(String message){System.out.println("topic topic-buget-queue2 消息消费成功,message内容为:"+ message);}

主题交换机 Topic Exchange 消息测试(触发消息生产及消费)代码代码如下:

@GetMapping("/send-topic-buget-message")privateStringsendTopicBugetMessage(@RequestParamString message){
    myRabbitProducer.sendTopicMessage(message);
    myRabbitProducer.sendTopicMessage2(message);return"OK";}

主题交换机 Topic Exchange 消息测试(触发消息生产及消费)结果如下:

topic topic-buget-queue 消息消费成功,message内容为:hello topic
topic topic-buget-queue2 消息消费成功,message内容为:hello topic
topic topic-buget-queue2 消息消费成功,message内容为:hello topic

主题交换机 Topic Exchange 我们声明了两个 topic.buget.exchange.routing.key.* 和 topic.buget.exchange.routing.key.#,其中星号可以代替一个单词,(#) 可以代替任意个单词,因此 topic-buget-queue 只能匹配到 topic.buget.exchange.routing.key.1 的消息,topic topic-buget-queue2 则可以匹配到两个路由的消息,结果符合预期。

Fanout Exchange(扇形交换机)

扇形交换机,一个交换器可以绑定多个队列,只要交换机接收到消息就会发送给所有和它绑定的队列,不再进行 RoutingKey 判断,也就是我们常说的发布订阅模式。

扇形交换机 Fanout Exchange 和队列 Queue 的路由配置代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitFanoutConfig{//注入队列@Bean("fanoutQueue")publicQueuequeue1(){returnnewQueue("fanout-buget-queue");}//注入队列@Bean("fanoutQueue2")publicQueuequeue2(){returnnewQueue("fanout-buget-queue2");}//注入交换机@BeanpublicFanoutExchangefanoutExchange(){//durable:重启后是否有效 autodelete: 长期未使用是否删除掉returnnewFanoutExchange("fanout-buget-exchange",true,true);}//绑定队列和交换机@Bean("fanoutBinding1")publicBindingbinding(){returnBindingBuilder.bind(queue1()).to(fanoutExchange());}//绑定队列和交换机@Bean("fanoutBinding2")publicBindingbinding2(){returnBindingBuilder.bind(queue2()).to(fanoutExchange());}}

扇形交换机 Fanout Exchange 消息生产代码如下:

//Fanout 模式消息发送publicvoidsendFanoutMessage(String message){
    rabbitTemplate.convertAndSend("fanout-buget-exchange","", message);}

扇形交换机 Fanout Exchange 消息消费代码如下:

//Fanout 模式消费端@RabbitListener(queues ="fanout-buget-queue")publicvoidfanoutConsumer1(String message){System.out.println("fanout fanout-buget-queue 消息消费成功,message内容为:"+ message);}//Fanout 模式消费端@RabbitListener(queues ="fanout-buget-queue2")publicvoidfanoutConsumer2(String message){System.out.println("fanout fanout-buget-queue2 消息消费成功,message内容为:"+ message);}

扇形交换机 Fanout Exchange 消息测试(触发消息生产及消费)代码代码如下:

@GetMapping("/send-fanout-buget-message")privateStringsendFanoutBugetMessage(@RequestParamString message){
    myRabbitProducer.sendFanoutMessage(message);return"OK";}

扇形交换机 Fanout Exchange 消息测试(触发消息生产及消费)结果如下:

fanout fanout-buget-queue2 消息消费成功,message内容为:hello fanout
fanout fanout-buget-queue 消息消费成功,message内容为:hello fanout

扇形交换机 Fanout Exchange 队列 Queue 只要与其绑定,就可以把消息路由到对应的 Queue 上并完成消费,结果符合预期。

Headers Exchange(头交换机)

Header Exchange 不依赖 RoutingKey 的判断,而是根据发送的消息内容中的 headers 属性进行匹配,当消息投递到首部交换器时,RabbitMQ 会获取到该消息的 headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配,则消息会路由到该队列,否则不会路由到该队列。

头交换机 Headers Exchange 和队列 Queue 的路由配置代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitHeadersConfig{//注入队列@Bean("headersQueue")publicQueuequeue1(){returnnewQueue("headers-buget-queue");}//注入队列@Bean("headersQueue2")publicQueuequeue2(){returnnewQueue("headers-buget-queue2");}//注入交换机@BeanpublicHeadersExchangeheadersExchange(){//durable:重启后是否有效 autodelete: 长期未使用是否删除掉returnnewHeadersExchange("headers-buget-exchange",true,true);}//绑定队列和交换机 绑定 Header中 header-key1 = a 的队列。@Bean("headersBinding1")publicBindingbinding(){returnBindingBuilder.bind(queue1()).to(headersExchange()).where("header-key1").matches("a");}//绑定队列和交换机 绑定 Header中  header-key2 = b 的队列。@Bean("headersBinding2")publicBindingbinding2(){Map<String,Object> map =newHashMap<>();
        map.put("header-key2","b");returnBindingBuilder.bind(queue2()).to(headersExchange()).whereAny(map).match();}}

头交换机 Headers Exchange 消息生产代码如下:

//Headers 模式消息发送publicvoidsendHeadersMessage(String message){MessageProperties messageProperties =newMessageProperties();
    messageProperties.setHeader("header-key1","a");
    rabbitTemplate.convertAndSend("headers-buget-exchange","",newMessage(message.getBytes(StandardCharsets.UTF_8), messageProperties));MessageProperties messageProperties2 =newMessageProperties();
    messageProperties2.setHeader("header-key2","b");
    rabbitTemplate.convertAndSend("headers-buget-exchange","",newMessage(message.getBytes(StandardCharsets.UTF_8), messageProperties2));}

头交换机 Headers Exchange 消息消费代码如下:

//headers 模式消费端@RabbitListener(queues ="headers-buget-queue")publicvoidheadersConsumer(String message){System.out.println("headers headers-buget-queue 消息消费成功,message内容为:"+ message);}//headers 模式消费端@RabbitListener(queues ="headers-buget-queue2")publicvoidheadersConsumer2(String message){System.out.println("headers headers-buget-queue2 消息消费成功,message内容为:"+ message);}

头交换机 Headers Exchange 消息测试(触发消息生产及消费)代码代码如下:

@GetMapping("/send-headers-buget-message")privateStringsendHeadersBugetMessage(@RequestParamString message){
    myRabbitProducer.sendHeadersMessage(message);return"OK";}

头交换机 Headers Exchange 消息测试(触发消息生产及消费)结果如下:

headers headers-buget-queue2 消息消费成功,message内容为:hello headers
headers headers-buget-queue 消息消费成功,message内容为:hello headers
headers headers-buget-queue2 消息消费成功,message内容为:hello headers

结果符合预期。

Backup Exchange(备份交换机)

RabbitMQ 本身是不存在备份交换机类型的,备份交换机是抽象出来的一个概念,通过设置交换机的 alternate-exchange 的屬性,属性值是交换机的名称,设置当前交换机的备份交换机,当消息路由无法在当前交换机匹配到合适的队列投递时,将会把消息转到备份交换机,分发到其绑定的备份队列中,备份交换机一般使用扇形交换机,因为其不需要进行路由匹配。

备份交换机 Backup Exchange 和队列 Queue 的路由配置代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitBackupConfig{//注入队列 durable:重启后是否有效 exclusive : 是否独自的 autodelete: 长期未使用是否删除掉@Bean("backupQueue")publicQueuebackupQueue(){returnnewQueue("backup-buget-queue",true,false,false);}//注入队列@Bean("noBbackupQueue")publicQueuenoBbackupQueue(){returnnewQueue("nobackup-buget-queue",true,false,false);}//注入 direct 交换机@Bean("nonBackupExchange")publicDirectExchangenonBackupExchange(){Map<String,Object> map =newHashMap<>();//当消息路由无法在当前交换机匹配到合适的队列投递时 将消息转到备份交换机 backup-buget-exchange 分发到其绑定的备份队列中
        map.put("alternate-exchange","backup-buget-exchange");returnnewDirectExchange("nobackup-buget-exchange",true,false, map);}//注入 Fanout 交换机@Bean("backupExchange")publicFanoutExchangebackupExchange(){returnnewFanoutExchange("backup-buget-exchange",true,false);}//绑定非备份队列 direct交换机@BeanpublicBindingnoBindBackupQueue(){returnBindingBuilder.bind(noBbackupQueue()).to(nonBackupExchange()).with("noback-buget-exchange-routing-key");}//扇形交换机的特性决定了它适合做备份交换机 (只要扇形交换机收到消息后就会被转发到与之绑定的队列中 不进行路由判断)@BeanpublicBindingbindBackupQueue(){returnBindingBuilder.bind(backupQueue()).to(backupExchange());}}

备份交换机 Backup Exchange 消息生产代码如下:

//备份模式 消息发送publicvoidsendBackupMessage(){//路由正确匹配 消息投递到非备份队列中
    rabbitTemplate.convertAndSend("nobackup-buget-exchange","noback-buget-exchange-routing-key","hello noBackup");//路由无法匹配 消息投递到备份队列中 nobackup-buget-exchange 交换机中没有路由 noback-buget-exchange-routing-key-1
    rabbitTemplate.convertAndSend("nobackup-buget-exchange","noback-buget-exchange-routing-key-1","hello backup");}

备份交换机 Backup Exchange 消息消费代码如下:

//backup 模式消费端(能够正确匹配到的队列)@RabbitListener(queues ="nobackup-buget-queue")publicvoidnoBackupConsumer(String message){System.out.println("nobackup-buget-queue 消息消费成功,message内容为:"+ message);}//backup 模式消费端 (没有匹配到全部来到 backup-buget-queue 队列) 也就是备份队列@RabbitListener(queues ="backup-buget-queue")publicvoidnackupConsumer(String message){System.out.println("backup-buget-queue 消息消费成功,message内容为:"+ message);}

备份交换机 Backup Exchange 消息测试(触发消息生产及消费)代码代码如下:

@GetMapping("/send-backup-buget-message")privateStringsendBackupBugetMessage(){
    myRabbitProducer.sendBackupMessage();return"OK";}

备份交换机 Backup Exchange 消息测试(触发消息生产及消费)结果如下:

nobackup-buget-queue 消息消费成功,message内容为:hello noBackup
backup-buget-queue 消息消费成功,message内容为:hello backup

我们通过 alternate-exchange 属性给 nobackup-buget-exchange 交换机设置了备份交换机 backup-buget-exchange,我们分别发了两个路由信息 noback-buget-exchange-routing-key 和 noback-buget-exchange-routing-key-1(找不到的路由),最终两条消息被消费了,结果符合预期。

Dead Exchange(死信交换机)

同备份交换机一样 RabbitMQ 本身是不存在死信交换机类型的,死信交换机可以理解成一个拥有特殊意义的直连交换机,通过设置队列中的 x-dead-letter-exchange 和 x-dead-letter-routing-key 属性来设置绑定死信交换机,当消费者拒绝消费、消息积压队列达到最大长度或者消息过期时,消息从正常队列转到死信队列,死信在转移到死信队列时,它的路由是会保存下来,但是如果配置了 x-dead-letter-routing-key 参数的话,路由就会被替换为配置的这个值。

死信交换机 Dead Exchange 和队列 Queue 的路由配置代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitDeadConfig{//注入 死信队列 durable:重启后是否有效 exclusive : 是否独自的 autodelete: 长期未使用是否删除掉@Bean("deadQueue")publicQueuedeadQueue(){returnnewQueue("dead-buget-queue",true,false,false);}//注入正常队列队列@Bean("normalQueue")publicQueuenormalQueue(){Map<String,Object> map =newHashMap<>();//当前队列中 message 过期时间
        map.put("x-message-ttl",5000);//给当前队列绑定死信交换机
        map.put("x-dead-letter-exchange","dead-buget-exchange");//绑定 Routing key
        map.put("x-dead-letter-routing-key","dead-buget-exchange-routing-key");returnnewQueue("normal-buget-queue",true,false,false, map);}//注入 normalExchange 交换机@Bean("normalExchange")publicDirectExchangenormalExchange(){returnnewDirectExchange("normal-buget-exchange",true,false);}//注入 死信交换机@Bean("deadExchange")publicDirectExchangedeadExchange(){returnnewDirectExchange("dead-buget-exchange",true,false);}//死信队列和交换器绑定@BeanpublicBindingbindDeadQueue(){returnBindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead-buget-exchange-routing-key");}//正常队列的交换器和队列绑定@BeanpublicBindingbindNormalQueue(){returnBindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal-buget-exchange-routing-key");}}

死信交换机 Dead Exchange 消息生产代码如下:

//死信 模式消息发送publicvoidsendDeadMessage(String message){System.out.println("normal-buget-queue 开始发送,当前时间:"+System.currentTimeMillis());
    rabbitTemplate.convertAndSend("normal-buget-exchange","normal-buget-exchange-routing-key", message);}

死信交换机 Dead Exchange 消息消费代码如下:

//死信 模式消费端@RabbitListener(queues ="dead-buget-queue")publicvoiddeadConsumer(String message){System.out.println("dead-buget-queue 开始消费,当前时间:"+System.currentTimeMillis());System.out.println("dead-buget-queue 消息消费成功,message内容为:"+ message);}

死信交换机 Dead Exchange 消息测试(触发消息生产及消费)代码代码如下:

@GetMapping("/send-dead-buget-message")privateStringsendBackupBugetMessage(@RequestParamString message){
    myRabbitProducer.sendDeadMessage(message);return"OK";}

死信交换机 Dead Exchange 消息测试(触发消息生产及消费)结果如下:

normal-buget-queue 开始发送,当前时间:1725281520323
dead-buget-queue 开始消费,当前时间:1725281525327
dead-buget-queue 消息消费成功,message内容为:dead

我们对 normalQueue 设置了 TTL 为 5秒钟,并为之设置了 死信交换机 dead-buget-exchange 和路由 dead-buget-exchange-routing-key,然后在 normalQueue 上发送了消息,但是没有对 normalQueue 进行监听消费,我们发现过了(1725281525327-1725281520323=5004)5秒后,dead-buget-queue 完成了消息消费,结果符合预期。

消息生产完整代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.nio.charset.StandardCharsets;@ComponentpublicclassMyRabbitProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;//direct 模式消息发送publicvoidsendDirectMessage(String message){
        rabbitTemplate.convertAndSend("direct-buget-exchange","direct-buget-exchange-routing-key", message);}//topic 模式消息发送publicvoidsendTopicMessage(String message){
        rabbitTemplate.convertAndSend("topic-buget-exchange","topic.buget.exchange.routing.key.1", message);}//topic 模式消息发送publicvoidsendTopicMessage2(String message){
        rabbitTemplate.convertAndSend("topic-buget-exchange","topic.buget.exchange.routing.key.1.1", message);}//Fanout 模式消息发送publicvoidsendFanoutMessage(String message){
        rabbitTemplate.convertAndSend("fanout-buget-exchange","", message);}//Headers 模式消息发送publicvoidsendHeadersMessage(String message){MessageProperties messageProperties =newMessageProperties();
        messageProperties.setHeader("header-key1","a");
        rabbitTemplate.convertAndSend("headers-buget-exchange","",newMessage(message.getBytes(StandardCharsets.UTF_8), messageProperties));MessageProperties messageProperties2 =newMessageProperties();
        messageProperties2.setHeader("header-key2","b");
        rabbitTemplate.convertAndSend("headers-buget-exchange","",newMessage(message.getBytes(StandardCharsets.UTF_8), messageProperties2));}//备份模式 消息发送publicvoidsendBackupMessage(){//路由正确匹配 消息投递到非备份队列中
        rabbitTemplate.convertAndSend("nobackup-buget-exchange","noback-buget-exchange-routing-key","hello noBackup");//路由无法匹配 消息投递到备份队列中 nobackup-buget-exchange 交换机中没有路由 noback-buget-exchange-routing-key-1
        rabbitTemplate.convertAndSend("nobackup-buget-exchange","noback-buget-exchange-routing-key-1","hello backup");}//死信 模式消息发送publicvoidsendDeadMessage(String message){System.out.println("normal-buget-queue 开始发送,当前时间:"+System.currentTimeMillis());
        rabbitTemplate.convertAndSend("normal-buget-exchange","normal-buget-exchange-routing-key", message);}}

消息消费完整代码如下:

packagecom.user.service.rabbitmq;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMyRabbitConsumer{//direct 直连模式消费端@RabbitListener(queues ="direct-buget-queue")publicvoiddirectConsumer(String message){System.out.println("direct 消息消费成功,message内容为:"+ message);}//topic 模式消费端@RabbitListener(queues ="topic-buget-queue")publicvoidtopicConsumer(String message){System.out.println("topic topic-buget-queue 消息消费成功,message内容为:"+ message);}//topic 模式消费端@RabbitListener(queues ="topic-buget-queue2")publicvoidtopicConsumer2(String message){System.out.println("topic topic-buget-queue2 消息消费成功,message内容为:"+ message);}//Fanout 模式消费端@RabbitListener(queues ="fanout-buget-queue")publicvoidfanoutConsumer1(String message){System.out.println("fanout fanout-buget-queue 消息消费成功,message内容为:"+ message);}//Fanout 模式消费端@RabbitListener(queues ="fanout-buget-queue2")publicvoidfanoutConsumer2(String message){System.out.println("fanout fanout-buget-queue2 消息消费成功,message内容为:"+ message);}//headers 模式消费端@RabbitListener(queues ="headers-buget-queue")publicvoidheadersConsumer(String message){System.out.println("headers headers-buget-queue 消息消费成功,message内容为:"+ message);}//headers 模式消费端@RabbitListener(queues ="headers-buget-queue2")publicvoidheadersConsumer2(String message){System.out.println("headers headers-buget-queue2 消息消费成功,message内容为:"+ message);}//backup 模式消费端(能够正确匹配到的队列)@RabbitListener(queues ="nobackup-buget-queue")publicvoidnoBackupConsumer(String message){System.out.println("nobackup-buget-queue 消息消费成功,message内容为:"+ message);}//backup 模式消费端 (没有匹配到全部来到 backup-buget-queue 队列) 也就是备份队列@RabbitListener(queues ="backup-buget-queue")publicvoidnackupConsumer(String message){System.out.println("backup-buget-queue 消息消费成功,message内容为:"+ message);}//死信 模式消费端@RabbitListener(queues ="dead-buget-queue")publicvoiddeadConsumer(String message){System.out.println("dead-buget-queue 开始消费,当前时间:"+System.currentTimeMillis());System.out.println("dead-buget-queue 消息消费成功,message内容为:"+ message);}}

消息测试(触发消息生产及消费)代码代码如下:

packagecom.user.service.controller;importcom.user.service.rabbitmq.MyRabbitProducer;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassRabbitController{@AutowiredprivateMyRabbitProducer myRabbitProducer;@GetMapping("/send-direct-buget-message")privateStringsendDirectBugetMessage(@RequestParamString message){
        myRabbitProducer.sendDirectMessage(message);return"OK";}@GetMapping("/send-topic-buget-message")privateStringsendTopicBugetMessage(@RequestParamString message){
        myRabbitProducer.sendTopicMessage(message);
        myRabbitProducer.sendTopicMessage2(message);return"OK";}@GetMapping("/send-fanout-buget-message")privateStringsendFanoutBugetMessage(@RequestParamString message){
        myRabbitProducer.sendFanoutMessage(message);return"OK";}@GetMapping("/send-headers-buget-message")privateStringsendHeadersBugetMessage(@RequestParamString message){
        myRabbitProducer.sendHeadersMessage(message);return"OK";}@GetMapping("/send-backup-buget-message")privateStringsendBackupBugetMessage(){
        myRabbitProducer.sendBackupMessage();return"OK";}@GetMapping("/send-dead-buget-message")privateStringsendBackupBugetMessage(@RequestParamString message){
        myRabbitProducer.sendDeadMessage(message);return"OK";}}

尚硅谷 RabbitMQ 学习视频

总结:本篇简单分享了 RabbitMQ 六种交换机的消息生产和消费的使用,只是 Demo 案例,在实际项目中替换成业务代码即可,希望可以帮助到有需要的朋友。

如有不正确的地方请各位指出纠正。


本文转载自: https://blog.csdn.net/weixin_42118323/article/details/141829003
版权归原作者 码农爱java 所有, 如有侵权,请联系我们删除。

“Spring Boot 整合 RabbitMQ 详解”的评论:

还没有评论