1、初识MQ
1.1 MQ是什么?
MQ(message queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
1.2 为什么要用MQ
使用消息队列(MQ)的主要原因包括解耦、异步处理、削峰填谷、提高系统响应速度、支持分布式一致性需求、以及点对点消费模式。
- 解耦:MQ允许系统之间实现解耦,一个系统产生的数据可以通过MQ发布,其他系统可以订阅该消息并消费,而无需直接与数据产生系统进行交互。这种解耦方式降低了系统之间的依赖性,减少了代码维护成本。
- 异步处理:MQ支持异步通信模式,当一个系统调用其他系统的API时,通过使用MQ进行异步化,系统可以将调用请求发送到队列中,然后继续处理其他任务,从而大幅缩短调用时间,提高高延时接口的速度。
- 削峰填谷:在高峰期,大量请求涌入系统时,如果直接将请求发送到数据库等后端存储,可能会导致系统崩溃。通过使用MQ,系统可以将请求先写入队列中,在低谷时逐个消费请求,从而避免系统崩溃。
- 提高系统响应速度:对于同步接口调用导致响应时间长的问题,使用MQ之后,将同步调用改成异步,能够显著减少系统响应时间。
- 支持分布式一致性需求:MQ可以支持分布式系统中的一致性需求。通过使用合适的MQ,系统可以保证各个模块执行的结果一致性,避免不同模块之间的数据不一致问题。
- 点对点消费模式:MQ支持点对点的消息消费模式,可以确保消息只被一个消费者接收和处理。这对于一些需要确保消息只被一个接收者处理的场景非常有用。
然而,引入MQ也可能带来一些问题,如可用性降低和复杂性增加。因此,根据不同的业务需求和技术实力,选择适合的MQ是非常重要的。
1.3 MQ 的分类
目比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScala&Java协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ
2、安装RabbitMQ
这里使用docker安装RabbitMQ镜像
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
可以看到在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
安装完成后,我们访问 http://localhost:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。
登录后即可看到管理控制台总览页面:
可以在控制台进行创建用户、虚拟机、队列等一系列操作
3、RabbitMQ的java客户端
3.1 前期准备
1、创建一个springboot项目
2、导入依赖坐标
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies></project>
3、在springboot项目中创建两个模块、继承父项目依赖
3.2 小试牛刀
3.2.1 在publisher模块中创建测试类用于发送消息
注意:包路径需要再启动类所在包下 或启动类的子包下
packagecom.itheima.publisher.amqp;/*
*@Author:张泽阳
*@Date:2024年06月02日 21:14
*Software: IntelliJ IDEA
*@Description:
* */importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidsimpleQueueTest(){// 队列名String queueName ="simple.queue";// 消息String message ="hhhhhhh";// 发送消息
rabbitTemplate.convertAndSend(queueName,message);}}
运行测试方法发送消息
3.2.2 在consumer模块监听消息
在consumer创建listener包在其中创建SpringRabbitListener类
packagecom.itheima.consumer.listener;/*
*@Author:张泽阳
*@Date:2024年06月02日 21:20
*Software: IntelliJ IDEA
*@Description:
* */importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassSpringRabbitListener{// 方法体中的参数就是监听到的消息@RabbitListener(queues ="simple.queue")publicvoidspringListener(String msg){System.out.println("监听到的消息:"+ msg );}}
这样就完成了RabbitMQ简单的消息发送与监听
3.3 Works Queues 模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
接下来,我们就来模拟这样的场景。
首先,我们在控制台创建一个新的队列,命名为work.queue
3.3.1 发送消息
在publisher模块中用for循环发送50条消息
@TestpublicvoidworkQueueTest(){// 队列名String queueName ="work.queue";// 消息String msg ="消息体==> ";for(int i =1; i <=51; i++){// 发送消息
rabbitTemplate.convertAndSend(queueName,msg + i);}}
3.3.2 监听消息
在consumer模块中新增两个方法用来监听work.queue中的消息
@RabbitListener(queues ="work.queue")publicvoidworkQueueListener1(String msg){System.out.println("监听者 1 得到的消息:"+ msg );}@RabbitListener(queues ="work.queue")publicvoidworkQueueListener2(String msg){System.out.println("监听者 2 得到的消息:"+ msg );}
在RabbitMQ的workQueue模型中默认采用轮询方式将所有消息平均分配给每个监听者,这样一来两个机器都要消费相同数量的消息,如果有一个机器非常慢,速度快的机器就要等速度慢的机器消费完。
3.3.3 改变消费速度
@RabbitListener(queues ="work.queue")publicvoidworkQueueListener1(String msg)throwsInterruptedException{System.out.println("监听者 1 得到的消息:"+ msg );Thread.sleep(25);}@RabbitListener(queues ="work.queue")publicvoidworkQueueListener2(String msg)throwsInterruptedException{System.out.println("监听者 2 得到的消息:"+ msg );Thread.sleep(200);}
在RabbitMQ的workQueue模型中默认采用轮询方式将所有消息平均分配给每个监听者,这样一来两个机器都要消费相同数量的消息,如果有一个机器非常慢,速度快的机器就要等速度慢的机器消费完。
3.3.4 修改配置文件
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.204.20 # 你的虚拟机IPport:5672# 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password:123# 密码listener:simple:prefetch:1# 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试,发现结果如下:
监听者 1 得到的消息:消息体==> 1 2024-06-03T14:52:27.133554900
监听者 2 得到的消息:消息体==> 2 2024-06-03T14:52:27.133554900
监听者 1 得到的消息:消息体==> 3 2024-06-03T14:52:27.162577900
监听者 1 得到的消息:消息体==> 4 2024-06-03T14:52:27.190329800
监听者 1 得到的消息:消息体==> 5 2024-06-03T14:52:27.217329400
监听者 1 得到的消息:消息体==> 6 2024-06-03T14:52:27.244262600
监听者 1 得到的消息:消息体==> 7 2024-06-03T14:52:27.272310600
监听者 1 得到的消息:消息体==> 8 2024-06-03T14:52:27.299311
监听者 1 得到的消息:消息体==> 9 2024-06-03T14:52:27.327377900
监听者 2 得到的消息:消息体==> 10 2024-06-03T14:52:27.336259100
监听者 1 得到的消息:消息体==> 11 2024-06-03T14:52:27.354458700
监听者 1 得到的消息:消息体==> 12 2024-06-03T14:52:27.381099300
监听者 1 得到的消息:消息体==> 13 2024-06-03T14:52:27.409382100
监听者 1 得到的消息:消息体==> 14 2024-06-03T14:52:27.436237
监听者 1 得到的消息:消息体==> 15 2024-06-03T14:52:27.464754200
监听者 1 得到的消息:消息体==> 16 2024-06-03T14:52:27.491437900
监听者 1 得到的消息:消息体==> 17 2024-06-03T14:52:27.517241200
监听者 2 得到的消息:消息体==> 18 2024-06-03T14:52:27.537599400
监听者 1 得到的消息:消息体==> 19 2024-06-03T14:52:27.546405100
监听者 1 得到的消息:消息体==> 20 2024-06-03T14:52:27.574655400
监听者 1 得到的消息:消息体==> 21 2024-06-03T14:52:27.600656300
监听者 1 得到的消息:消息体==> 22 2024-06-03T14:52:27.627640400
监听者 1 得到的消息:消息体==> 23 2024-06-03T14:52:27.655109100
监听者 1 得到的消息:消息体==> 24 2024-06-03T14:52:27.682036500
监听者 1 得到的消息:消息体==> 25 2024-06-03T14:52:27.710680200
监听者 2 得到的消息:消息体==> 26 2024-06-03T14:52:27.738643100
监听者 1 得到的消息:消息体==> 27 2024-06-03T14:52:27.740556800
监听者 1 得到的消息:消息体==> 28 2024-06-03T14:52:27.769239800
监听者 1 得到的消息:消息体==> 29 2024-06-03T14:52:27.797490300
监听者 1 得到的消息:消息体==> 30 2024-06-03T14:52:27.824438700
监听者 1 得到的消息:消息体==> 31 2024-06-03T14:52:27.854432500
监听者 1 得到的消息:消息体==> 32 2024-06-03T14:52:27.880662
监听者 1 得到的消息:消息体==> 33 2024-06-03T14:52:27.908320800
监听者 1 得到的消息:消息体==> 34 2024-06-03T14:52:27.937883400
监听者 2 得到的消息:消息体==> 35 2024-06-03T14:52:27.941885700
监听者 1 得到的消息:消息体==> 36 2024-06-03T14:52:27.964396500
监听者 1 得到的消息:消息体==> 37 2024-06-03T14:52:27.990140600
监听者 1 得到的消息:消息体==> 38 2024-06-03T14:52:28.017677
监听者 1 得到的消息:消息体==> 39 2024-06-03T14:52:28.047443500
监听者 1 得到的消息:消息体==> 40 2024-06-03T14:52:28.074392800
监听者 1 得到的消息:消息体==> 41 2024-06-03T14:52:28.101285800
监听者 1 得到的消息:消息体==> 42 2024-06-03T14:52:28.127155800
监听者 2 得到的消息:消息体==> 43 2024-06-03T14:52:28.142776
监听者 1 得到的消息:消息体==> 44 2024-06-03T14:52:28.155621600
监听者 1 得到的消息:消息体==> 45 2024-06-03T14:52:28.182376
监听者 1 得到的消息:消息体==> 46 2024-06-03T14:52:28.209322300
监听者 1 得到的消息:消息体==> 47 2024-06-03T14:52:28.235621600
监听者 1 得到的消息:消息体==> 48 2024-06-03T14:52:28.263368300
监听者 1 得到的消息:消息体==> 49 2024-06-03T14:52:28.290364100
监听者 1 得到的消息:消息体==> 50 2024-06-03T14:52:28.318363400
监听者 1 得到的消息:消息体==> 51 2024-06-03T14:52:28.346327500
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
3.4 交换机类型
在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.5 Fanout交换机
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
创建一个名为
my.fanout
的交换机,类型是
Fanout
创建两个队列
fanout.queue1
和
fanout.queue2
,绑定到交换机
my.fanout
3.5.1创建交换机和队列
创建交换机
创建对列
将队列绑定到交换机上
3.5.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@TestpublicvoidfunoutExchangeTest(){// 交换机名String exchangeName ="my.fanout";// 消息String message ="hello";// 发送消息 三个参数:交换机名、队列名、消息
rabbitTemplate.convertAndSend(exchangeName,null,message);}
3.5.3 消息接收
@RabbitListener(queues ="fanout.queue1")publicvoidexchangeListener1(String msg)throwsInterruptedException{System.out.println("消费者1监听到fanout消息:"+ msg +" "+LocalDateTime.now());}@RabbitListener(queues ="fanout.queue2")publicvoidexchangeListener2(String msg)throwsInterruptedException{System.out.println("消费者2监听到fanout消息:"+ msg +" "+LocalDateTime.now());}
两个消费者都监听到了funoutExchangeTest()发布的消息
3.6 Direct 交换机
3.6.1 创建交换机和队列
创建两个队列:direct.queue1和direct.queue2
创建一个direct类型的交换机: my.direct
将两个队列绑定到my.direct交换机上并给队列指定Routing key
3.6.2 消息发送
@TestpublicvoiddirectExchangeTest(){// 交换机名String exchangeName ="my.direct";// Routing keyString routingKey ="red";// 发送消息 三个参数:交换机名、队列名、消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,"红色请接收");}
3.6.2 消息接收
@RabbitListener(queues ="direct.queue1")publicvoiddirectListener1(String msg)throwsInterruptedException{System.out.println("Routing key:red、blue 接收到消息=======>"+ msg);}@RabbitListener(queues ="direct.queue2")publicvoiddirectListener2(String msg)throwsInterruptedException{System.out.println("Routing key:red、yellow 接收到消息=======>"+ msg);}
此时routing key 为 red 两个消费者都监听到了消息
@TestpublicvoiddirectExchangeTest(){// 交换机名String exchangeName ="my.direct";// Routing keyString routingKey ="blue";// 发送消息 三个参数:交换机名、队列名、消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,"蓝色请接收");}
将routing key 改为blue后只有绑定key 为含blue的消费监听到了
3.7 Topic 交换机
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如: item.insert
通配符规则:
- #:匹配一个或多个词
- *:匹配不多不少恰好1个词
3.7.1 创建交换机和队列
创建两个队列分别为:topic.queue1 和 topic.queue2
创建topic类型的交换机 my.topic
利用通配符绑定队列和交换机。
3.7.2 发送消息
@TestpublicvoidtopicExchangeTest(){// 交换机名String exchangeName ="my.toptic";// Routing keyString routingKey ="china.news";// 发送消息 三个参数:交换机名、队列名、消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,"台湾回归祖国怀抱!!!");}
3.7.3 消息接收
@RabbitListener(queues ="topic.queue1")publicvoidtopicListener1(String msg)throwsInterruptedException{System.out.println("china.# 接收到消息=======>"+ msg);}@RabbitListener(queues ="topic.queue2")publicvoidtopicListener2(String msg)throwsInterruptedException{System.out.println("#.news 接收到消息=======>"+ msg);}
3.8 基于bean声明队列交换机
SpringAMQP提供了一个Queue类,用来创建队列:
我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:
而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:
3.8.1 fanout示例
先把之前在控制台创建的fanout.queue队列和my.fanout交换机删除
packagecom.itheima.consumer.config;/*
*@Author:张泽阳
*@Date:2024年06月03日 20:02
*Software: IntelliJ IDEA
*@Description:
* */importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
* 创建交换机
* @return
*/@BeanpublicFanoutExchangefanoutExchange(){// return new FanoutExchange("my.fanout");returnExchangeBuilder.fanoutExchange("my.fanout").build();}/**
* 创建队列
*/@BeanpublicQueuequeue1(){returnnewQueue("fanout.queue1");}/**
* 将队列绑定到交换机
*/@BeanpublicBindingbindingQueue1(Queue queue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(queue1).to(fanoutExchange);}}
3.8.2 基于注解声明
删除之前在控制台创建的direct.queue队列和my.direct交换机
创建队列
packagecom.itheima.consumer.config;/*
*@Author:张泽阳
*@Date:2024年06月03日 20:30
*Software: IntelliJ IDEA
*@Description:
* */importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("my.direct");}@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1");}@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue1");}}
消费者监听消息
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue1"),
exchange =@Exchange(name ="my.direct",type =ExchangeTypes.DIRECT),
key ={"red","blue"}))publicvoiddirectListener1(String msg)throwsInterruptedException{System.out.println("Routing key:red、blue 接收到消息=======>"+ msg);}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue2"),
exchange =@Exchange(name ="my.direct",type =ExchangeTypes.DIRECT),
key ={"red","yellow"}))publicvoiddirectListener2(String msg)throwsInterruptedException{System.out.println("Routing key:red、yellow 接收到消息=======>"+ msg);}
3.9 消息转换器
Spring的消息发送代码接收的消息体是一个Object:
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
我们来测试一下。
3.9.1 测试默认转换器
创建队列
packagecom.itheima.consumer.config;/*
*@Author:张泽阳
*@Date:2024年06月03日 21:00
*Software: IntelliJ IDEA
*@Description:
* */importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassObjectQueueConfig{@BeanpublicQueueobjectQueue(){returnnewQueue("object.queue");}}
发送消息
@TestpublicvoidobjectQueueTest(){// 队列名String queueName ="object.queue";// 创建消息Map<String,Integer> map =newHashMap<>(2);
map.put("jack",18);
map.put("tom",20);// 发送消息
rabbitTemplate.convertAndSend(queueName,map);}
在控制台查看消息
消息类型为:application/x-java-serialized-object
3.9.2 配置json消息转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在父工程的pom文件中引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>
配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
packagecom.itheima.consumer;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.amqp.support.converter.MessageConverter;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.context.annotation.Bean;@SpringBootApplicationpublicclassConsumerApplication{publicstaticvoidmain(String[] args){SpringApplication.run(ConsumerApplication.class, args);}@BeanpublicMessageConvertermessageConverter(){// 1、定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。
此时,我们到MQ控制台删除
object.queue
中的旧的消息。然后再次执行刚才的消息发送的代码,到MQ的控制台查看消息结构:
版权归原作者 不想吃辣堡 所有, 如有侵权,请联系我们删除。