0


Spring Boot 集成 RabbitMQ

依赖与配置

在 pom.xml 中引入 RabbitMQ 相关依赖

  1. <!--AMQP 依赖,RabbitMq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>3.2.7</version></dependency>

在 application.yml 中添加配置

  1. spring:
  2. rabbitmq:
  3. host:127.0.0.1
  4. port:5672
  5. username: guest
  6. password: guest

RabbitMQ使用

扇形交换机

扇形交换机,顾名思义,就是像一把扇子一样,一个交换器可以绑定多个队列,只要交换机接收到消息就会发送给所有和它绑定的队列。

假设扇形交换机 fanoutExchange 绑定了队列 fanoutQueue1 和 fanoutQueue2 ,那么我们往 fanoutExchange 发送一条消息,fanoutQueue1 和 fanoutQueue2 都会收到一条相同的消息,如果消息未被消费我们可以在 RabbitMQ 管理端看到这两个队列和队列内积压的一条相同的消息。

  1. @ConfigurationpublicclassFanoutExchangeConfig{publicstaticfinalStringFANOUT_QUEUE1="fanout.queue1";publicstaticfinalStringFANOUT_QUEUE2="fanout.queue2";publicstaticfinalStringFANOUT_EXCHANGE="fanout.exchange";//声明队列Q1@Bean("fanoutQ1")publicQueuefanoutQ1(){returnnewQueue(FANOUT_QUEUE1);}//声明队列Q1@Bean("fanoutQ2")publicQueuefanoutQ2(){returnnewQueue(FANOUT_QUEUE2);}//声明扇形交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange(FANOUT_EXCHANGE);}//队列Q1绑定扇形交换机@Bean("bindFanoutQ1")publicBindingbindFanoutQ1(){returnBindingBuilder.bind(fanoutQ1()).to(fanoutExchange());}//队列Q2绑定扇形交换机@Bean("bindFanoutQ2")publicBindingbindFanoutQ2(){returnBindingBuilder.bind(fanoutQ2()).to(fanoutExchange());}}

编写 junit 测试, 发送消息

  1. @TestpublicvoidfanoutTest(){
  2. rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE,"","fantoutTest");}

发送消息后由于没有消费者,可以在管理端看到积压在队列中的消息。

在这里插入图片描述

在这里插入图片描述

直连交换机

一个直连交换机可以有多个队列,但每个队列都有一个路由一一匹配,交换机根据路由将消息投递到对应队列中。当同一个队列有多个消费者时,消息不会被重复消费,直连交换机能够轮询公平的将消息分发给每个消费者。

假设直连交换机 directExchange 与队列 directQueue1 通过路由 directRoute1 绑定, 与directQueue2 通过路由 directRoute2 绑定。当生产者发送路由为 directRoute1 的消息给 directExchange 时,消息会被投递到 directQueue1 ,directQueue2 则接收不到消息。

  1. @ConfigurationpublicclassDirectExchangeConfig{publicstaticfinalStringDIRECT_QUEUE1="direct.queue1";publicstaticfinalStringDIRECT_QUEUE2="direct.queue2";publicstaticfinalStringDIRECT_EXCHANGE="direct.exchange";publicstaticfinalStringDIRECT_ROUTE_KEY1="direct.route.key1";//声明队列Q1@Bean("directQ1")publicQueuedirectQ1(){returnnewQueue(DIRECT_QUEUE1);}//声明队列Q1@Bean("directQ2")publicQueuedirectQ2(){returnnewQueue(DIRECT_QUEUE2);}//声明直连交换机@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE);}//队列Q1绑定直连交换机@Bean("bindDirectQ1")publicBindingbindDirectQ1(){returnBindingBuilder.bind(directQ1()).to(directExchange()).with(DIRECT_ROUTE_KEY1);}//队列Q2绑定直连交换机@Bean("bindDirectQ2")publicBindingbindDirectQ2(){returnBindingBuilder.bind(directQ2()).to(directExchange()).with("");}}

编写 junit 测试,投递消息给交换机。

  1. @TestpublicvoiddirectTest(){//消息被投递给 bindDirectQ2
  2. rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,"","directTest");//消息被投递给 bindDirectQ1
  3. rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,DIRECT_ROUTE_KEY1,"directTest-key1");//没有匹配的路由,bindDirectQ1 和 bindDirectQ2 都无法接收
  4. rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE,"test","directTest-test");}

主题交换机

一个主题交换机可以有多个绑定队列,支持路由模糊匹配,可以使用星号(

  1. *

)和井号(

  1. #

)作为通配符进行匹配。其中,

  1. *

可以代替一个单词,

  1. #

可以代替任意个单词。

假设主题交换机 topicExchange 通过路由 topic.route.* 绑定队列 topicQueue1 , 通过路由 topic.route.# 绑定队列 topicQueue2。当生产者通过路由 topic.route.1 和 topic.route.1.1 投递消息给 topicExchange 时, topicQueue2 会接收到两条不同路由的消息, 而 topicQueue1 仅能接收到路由为 topic.route.1 的消息。

  1. @ConfigurationpublicclassTopicExchangeConfig{publicstaticfinalStringTOPIC_QUEUE1="topic.queue1";publicstaticfinalStringTOPIC_QUEUE2="topic.queue2";publicstaticfinalStringTOPIC_EXCHANGE="topic.exchange";publicstaticfinalStringTOPIC_ROUTE_KEY1="topic.route.*";publicstaticfinalStringTOPIC_ROUTE_KEY2="topic.route.#";//声明队列Q1@Bean("topicQ1")publicQueuetopicQ1(){returnnewQueue(TOPIC_QUEUE1);}//声明队列Q1@Bean("topicQ2")publicQueuetopicQ2(){returnnewQueue(TOPIC_QUEUE2);}//声明主题交换机@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(TOPIC_EXCHANGE);}//队列Q1绑定主题交换机@Bean("bindTopicQ1")publicBindingbindTopicQ1(){returnBindingBuilder.bind(topicQ1()).to(topicExchange()).with(TOPIC_ROUTE_KEY1);}//队列Q2绑定主题交换机@Bean("bindTopicQ2")publicBindingbindTopicQ2(){returnBindingBuilder.bind(topicQ2()).to(topicExchange()).with(TOPIC_ROUTE_KEY2);}}

编写 junit 测试,投递两条不同路由的消息给主题交换机

  1. @TestpublicvoidtopicTest(){//消息被投递给 topicQ1 和 topicQ2
  2. rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE,"topic.route.1","topicTest-*");//消息仅投递给 topicQ2
  3. rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE,"topic.route.1.1","topicTest-#");}

可以从管理端看到 topicQueue2 由于绑定的是通配符

  1. #

,故此两条消息都有被投递到队列中,topicQueue1 由于绑定的是通配符

  1. *

只匹配到一条消息,故此只有一条消息被投递到队列中。

在这里插入图片描述

首部交换机

首部交换机通过设置消息的头部信息来进行绑定队列的分发,它不依赖于路由键的匹配规则来分发消息,而是根据发送的消息内容中的headers属性进行匹配。当消息投递到首部交换器时,RabbitMQ会获取到该消息的headers(一个键值对的形式),并且对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果完全匹配,则消息会路由到该队列,否则不会路由到该队列。

  1. @ConfigurationpublicclassHeadExchangeConfig{publicstaticfinalStringHEADER_QUEUE1="header.queue1";publicstaticfinalStringHEADER_QUEUE2="header.queue2";publicstaticfinalStringHEADER_QUEUE3="header.queue3";publicstaticfinalStringHEADER_EXCHANGE="header.exchange";publicstaticfinalStringHEADER_KEY1="headerKey1";publicstaticfinalStringHEADER_KEY2="headerKey2";//声明queue@Bean("headerQueue1")publicQueueheaderQueue1(){returnnewQueue(HEADER_QUEUE1);}@Bean("headerQueue2")publicQueueheaderQueue2(){returnnewQueue(HEADER_QUEUE2);}@Bean("headerQueue3")publicQueueheaderQueue3(){returnnewQueue(HEADER_QUEUE3);}//声明首部交换机@BeanpublicHeadersExchangeheaderExchange(){returnnewHeadersExchange(HEADER_EXCHANGE);}//声明Binding,绑定Header(消息头部参数)中 HEADER_KEY1 = a的队列。header的队列匹配可以用mathces和exisits@BeanpublicBindingbindHeaderQueue1(){returnBindingBuilder.bind(headerQueue1()).to(headerExchange()).where(HEADER_KEY1).matches("a");}//绑定Header中 HEADER_KEY2 =1的队列。@BeanpublicBindingbindHeaderBusTyp1(){returnBindingBuilder.bind(headerQueue2()).to(headerExchange()).where(HEADER_KEY2).matches("b");}//绑定Header中 HEADER_KEY1 = a 或者 HEADER_KEY2 = b 的队列。@BeanpublicBindingbindHeaderTxBusTyp1(){Map<String,Object> condMap =newHashMap<>();
  2. condMap.put(HEADER_KEY1,"a");
  3. condMap.put(HEADER_KEY2,"b");returnBindingBuilder.bind(headerQueue3()).to(headerExchange()).whereAny(condMap).match();}}

编写 junit 测试, 往首部交换机投递信息

  1. @TestpublicvoidheaderTest(){MessageProperties properties =newMessageProperties();
  2. properties.setHeader(HeadExchangeConfig.HEADER_KEY1,"a");//消息被投递到 headerQueue1 和 headerQueue3
  3. rabbitTemplate.convertAndSend(HeadExchangeConfig.HEADER_EXCHANGE,"",newMessage("headerTest-a".getBytes(), properties));
  4. properties.setHeader(HeadExchangeConfig.HEADER_KEY1,"");
  5. properties.setHeader(HeadExchangeConfig.HEADER_KEY2,"b");//消息被投递到 headerQueue2 和 headerQueue3
  6. rabbitTemplate.convertAndSend(HeadExchangeConfig.HEADER_EXCHANGE,"",newMessage("headerTest-b".getBytes(), properties));}

备份交换机

通过设置交换机的

  1. alternate-exchange

的参数设置备份交换机,当消息路由无法在当前交换机匹配到合适的队列投递时,将消息转到备份交换机,分发到其绑定的备份队列中。

  1. @ConfigurationpublicclassBackupExchangeConfig{publicstaticfinalStringBACKUP_QUEUE="backup.queue";publicstaticfinalStringBACKUP_EXCHANGE="backup.exchange";publicstaticfinalStringBACKUP_ROUTE_KEY="backup.key";publicstaticfinalStringNON_BACKUP_QUEUE="nonbackup.queue";publicstaticfinalStringNON_BACKUP_EXCHANGE="nonbackup.exchange";publicstaticfinalStringNON_BACKUP_ROUTE_KEY="nonbackup.key";@Bean("backupQueue")publicQueuebackupQueue(){returnnewQueue(BACKUP_QUEUE,true,false,false);}@Bean("nonBackupQueue")publicQueuenonBackupQueue(){returnnewQueue(NON_BACKUP_QUEUE,true,false,false);}@Bean("nonBackupExchange")publicDirectExchangenonBackupExchange(){Map<String,Object> args =newHashMap<>(2);
  2. args.put("alternate-exchange",BACKUP_EXCHANGE);returnnewDirectExchange(NON_BACKUP_EXCHANGE,true,false, args);}@Bean("backupExchange")publicFanoutExchangebackupExchange(){returnnewFanoutExchange(BACKUP_EXCHANGE,true,false);}@Bean("bindNonBackupQueue")publicBindingbindNonBackupQueue(){returnBindingBuilder.bind(nonBackupQueue()).to(nonBackupExchange()).with(NON_BACKUP_ROUTE_KEY);}@Bean("bindBackupQueue")publicBindingbindBackupQueue(){returnBindingBuilder.bind(backupQueue()).to(backupExchange());}}

编写测试用例, 投递消息给备份交换机

  1. @TestpublicvoidbackupTest(){//路由正确匹配,消息投递到非备份队列中
  2. rabbitTemplate.convertAndSend(BackupExchangeConfig.NON_BACKUP_EXCHANGE,BackupExchangeConfig.NON_BACKUP_ROUTE_KEY,"nonBackupTest");//路由无法匹配,消息投递到备份队列中
  3. rabbitTemplate.convertAndSend(BackupExchangeConfig.NON_BACKUP_EXCHANGE,BackupExchangeConfig.NON_BACKUP_ROUTE_KEY+"123","backupTest");}

可以看到备份队列和非备份队列中各有一条消息。

在这里插入图片描述

死信交换机

死信交换机其实可以理解成一个拥有特殊意义的直连交换机,正常队列通过设置队列中的

  1. x-dead-letter-exchange

  1. x-dead-letter-routing-key

参数来设置绑定死信交换机,当消费者拒绝消费、消息积压队列达到最大长度或者消息过期时,消息从正常队列转到死信队列。

死信在转移到死信队列时,它的路由也会保存下来。但是如果配置了

  1. x-dead-letter-routing-key

参数的话,路由就会被替换为配置的这个值。另外,死信在转移到死信队列的过程中,是没有经过消息发送者确认的,所以并不能保证消息的安全性。

消息被作为死信转移到死信队列后,会在Header当中增加一些消息。比如时间、原因(rejected,expired,maxlen)、队列等。另外header中还会加上第一次成为死信的三个属性(

  1. x-first-death-reason

  1. x-first-death-queue

  1. x-first-death-exchange

),并且这三个属性在以后的传递过程中都不会更改。

死信队列也可以向其它队列一样被消费者正常订阅消费。

  1. @ConfigurationpublicclassDeadLetterExchangeConfig{publicstaticfinalStringDEAD_QUEUE="dead.queue";publicstaticfinalStringDEAD_EXCHANGE="dead.exchange";publicstaticfinalStringDEAD_ROUTE_KEY="dead.key";publicstaticfinalStringNON_DEAD_QUEUE="nondead.queue";publicstaticfinalStringNON_DEAD_EXCHANGE="nondead.exchange";publicstaticfinalStringNON_DEAD_ROUTE_KEY="nondead.key";@Bean("deadQueue")publicQueuedeadQueue(){returnnewQueue(DEAD_QUEUE,true,false,false);}@Bean("nonDeadQueue")publicQueuenonDeadQueue(){Map<String,Object> args =newHashMap<>(2);
  2. args.put("x-message-ttl",10000);
  3. args.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  4. args.put("x-dead-letter-routing-key",DEAD_ROUTE_KEY);returnnewQueue(NON_DEAD_QUEUE,true,false,false, args);}@Bean("deadExchange")publicDirectExchangedeadExchange(){returnnewDirectExchange(DEAD_EXCHANGE,false,false);}@Bean("nonDeadExchange")publicDirectExchangenonDeadExchange(){returnnewDirectExchange(NON_DEAD_EXCHANGE,true,false);}@Bean("bindDeadQueue")publicBindingbindDeadQueue(){returnBindingBuilder.bind(deadQueue()).to(deadExchange()).with(DEAD_ROUTE_KEY);}@Bean("bindNonDeadQueue")publicBindingbindNonDeadQueue(){returnBindingBuilder.bind(nonDeadQueue()).to(nonDeadExchange()).with(NON_DEAD_ROUTE_KEY);}}

编写 junit 测试, 投递一条10秒过期的消息,刚投递时消息存在于正常队列,10秒过期后转到死信队列。 投递消息和队列同时设置过期时间时,以时间更短的为准。

  1. @TestpublicvoiddeadTest(){
  2. rabbitTemplate.convertAndSend(DeadLetterExchangeConfig.NON_DEAD_EXCHANGE,DeadLetterExchangeConfig.NON_DEAD_ROUTE_KEY,"deadTest 1");}

延时队列

通过TTL+死信队列实现延时队列,与上述死信交换机使用大同小异,核心就是创建队列的时候设置如下三个参数:

  • x-message-ttl (必要) :当前队列消息多久未消费进入死信队列
  • x-dead-letter-exchange (必要):消息过期后进入的死信队列交换机
  • x-dead-letter-routing-key (非必要):消息的路由, 未设置时保留原队列的路由

TTL 消息可以通过以下方式创建

方式一:在队列中设置

  1. x-message-ttl

参数

  1. @Bean("nonDeadQueue")publicQueuenonDeadQueue(){Map<String,Object> args =newHashMap<>(2);
  2. args.put("x-message-ttl",10000);
  3. args.put("x-dead-letter-exchange",DEAD_EXCHANGE);
  4. args.put("x-dead-letter-routing-key",DEAD_ROUTE_KEY);returnnewQueue(NON_DEAD_QUEUE,true,false,false, args);}

方式二: 在投递消息时设置消息的过期时间

  1. MessageProperties properties =newMessageProperties();
  2. properties.setExpiration("10000");
  3. rabbitTemplate.convertAndSend(DeadLetterExchangeConfig.NON_DEAD_EXCHANGE,DeadLetterExchangeConfig.NON_DEAD_ROUTE_KEY,newMessage("ttl test".getBytes(), properties));

两种方式同时都有设置时,时间短的设置生效。

动态创建队列、交换机及绑定关系

Spring Boot 封装了一些类用于对 RabbitMQ 的管理

  • AmqpAdmin 用于管理队列、交换机及绑定关系 。
  • RabbitTemplate 对消息操作的一些封装。
  1. @AutowiredprivateAmqpAdmin amqpAdmin;publicvoidcreateComponents(){String queueName ="amqp.queue";String exchangeName ="amqp.exchange";//声明(创建)队列Queue queue =newQueue(queueName,false,false,false,null);
  2. amqpAdmin.declareQueue(queue);//声明交换机FanoutExchange fanoutExchange =newFanoutExchange(exchangeName,false,false,null);
  3. amqpAdmin.declareExchange(fanoutExchange);//声明绑定
  4. amqpAdmin.declareBinding(newBinding(queueName,Binding.DestinationType.QUEUE, exchangeName,"",null));}

消息监听

启动类添加

  1. @EnableRabbit

注解启用RabbitMQ ,通过注解

  1. @RabbitListener

  1. @RabbitHandler

进行消息的消费

  1. @Component@RabbitListener(queues ="fanout.queue1")publicclassFanoutQueueRecever{@RabbitHandlerpublicvoidhandle(String msg){System.out.println("收到来自 fanout.queue1 的消息 :"+ msg);}}

消息确认

保证发送方消息不丢失

开启生产端确认, 消息发送成功后回调,获得预期结果后才认为消息发送成功。

  • 交换机收到消息进行回调,ConfirmCallbackspring.rabbitmq.publisher-confirm-type: correlated (高版本Spring使用) spring.rabbitmq.publisher-confirms: true(低版本Spring使用)
  • 消息正确抵达队列进行回调,ReturnsCallbackspring.rabbitmq.publisher-returns: true spring.rabbitmq.template.mandatory: true, 只要抵达队列,以异步形式优先发送回调 ReturnCallback

保证消费者消息不丢失

开启消费端确认(保证每个消息都被正确消费,此时才可以删除这个消息)

  • 手动ack消息 spring.rabbitmq.listener.simple.acknowledge-mode: manual- AcknowledgeMode.NONE:自动确认 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。 如果消息已经被处理,但后续出现异常导致事务回滚,也同样造成了实际意义的消息丢失。- AcknowledgeMode.AUTO:根据情况确认- AcknowledgeMode.MANUAL:手动确认 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者。 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限,不足以处理后续投递的消息。
  • ACK的几种方法- channel.basicNack(deliveryTag, multiple, requeue); 拒绝消费。deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。requeue: 是否重新入队- channel.basicAck(deliveryTag, multiple); 确认消费,参数解释同上。- channel.basicReject(deliveryTag, requeue); 拒绝消费,不支持批量操作,用法与basicNack()类似。

代码实现

yaml 文件配置

  1. spring:
  2. rabbitmq:
  3. host:127.0.0.1
  4. port:5672
  5. username: guest
  6. password: guest
  7. # 开启发送端确认
  8. #publisher-confirms:true
  9. publisher-confirm-type: correlated
  10. #开启发送端消息抵达确认
  11. publisher-returns:true
  12. #只要抵达队列。以异步发送优先回调returnconfirm
  13. template:
  14. mandatory:true
  15. # 手动ack消息
  16. listener:
  17. simple:
  18. acknowledge-mode: manual

配置RabbitMQ, 设置发送者消息确认逻辑

  1. @ConfigurationpublicclassRabbitMqConfigimplementsRabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){
  2. rabbitTemplate.setReturnsCallback(this);
  3. rabbitTemplate.setConfirmCallback(this);}/**
  4. * 发送消息触发confirmCallback回调, 无论是否到达队列,只要有到达交换机都会触发这个回调
  5. * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null)
  6. * @param ack:消息是否成功收到(ack=true,消息抵达Broker)
  7. * @param cause:失败的原因
  8. */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){System.out.println("发送消息触发confirmCallback回调");System.out.println(String.format("correlationData:%s\nack:%s\ncause:%s", correlationData, ack, cause));}/**
  9. * 消息未到达队列触发returnCallback回调,只要消息没有投递给指定的队列,就触发这个失败回调
  10. * @param returnedMessage 返回的消息,包含
  11. * message:投递失败的消息详细信息
  12. * replyCode:回复的状态码
  13. * replyText:回复的文本内容
  14. * exchange:接收消息的交换机
  15. * routingKey:接收消息的路由键
  16. */@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){System.out.println("消息未到达队列触发returnCallback回调");System.out.println(returnedMessage.toString());}}

实现消费者确认

  1. @Component@RabbitListener(queues ="fanout.queue1")publicclassFanoutQueueRecever{@RabbitHandlerpublicvoidhandle(String msg,Channel channel,@HeadersMap<String,Object> headers){System.out.println("收到来自 fanout.queue1 的消息 :"+ msg);System.out.println("tag:"+ headers.get(AmqpHeaders.DELIVERY_TAG));try{if("ack".equalsIgnoreCase(msg)){
  2. channel.basicAck((long)headers.get(AmqpHeaders.DELIVERY_TAG),false);}if("nack".equalsIgnoreCase(msg)){
  3. channel.basicNack((long)headers.get(AmqpHeaders.DELIVERY_TAG),false,true);}if("reject".equalsIgnoreCase(msg)){
  4. channel.basicReject((long)headers.get(AmqpHeaders.DELIVERY_TAG),false);}}catch(IOException e){
  5. e.printStackTrace();}}/**
  6. * @Payload 注解的对象需要实现序列化
  7. * @Headers 获取所有头部信息
  8. * @Header 获取单个头部信息
  9. */@RabbitHandler(isDefault =true)publicvoidhandleMap(@PayloadMyMessage message,Channel channel,@HeadersMap<String,Object> headers){System.out.println("收到来自 fanout.queue1 的消息 :"+ message.toString());System.out.println("tag:"+ headers.get(AmqpHeaders.DELIVERY_TAG));try{
  10. channel.basicAck((long)headers.get(AmqpHeaders.DELIVERY_TAG),false);}catch(IOException e){
  11. e.printStackTrace();}}}

编写 junit 测试, 发送测试消息

  1. @Data@AllArgsConstructor@NoArgsConstructor@EqualsAndHashCodepublicclassMyMessageimplementsSerializable{privateString id;privateString name;}
  1. @TestpublicvoidconfirmTest(){//未知交换机, 触发 confirmCallback 回调//rabbitTemplate.convertAndSend("unknow", "unknow", "confirmTest");//未知路由, 消息到达交换机但是无法到达队列, 触发 confirmCallback 回调和 returnCallback 回调//rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, "topic", "topicTest-*");//正常到达队列,触发confirmCallback回调
  2. rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE,"","fantoutTest",newCorrelationData("1"));
  3. rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE,"",newMyMessage("1","张三"),newCorrelationData("2"));}

本文转载自: https://blog.csdn.net/l15570068307/article/details/140327136
版权归原作者 50070 所有, 如有侵权,请联系我们删除。

“Spring Boot 集成 RabbitMQ”的评论:

还没有评论