RabbitMQ 概述
生活中的案例生产中的问题为什么要使用 MQ
- 学生问问题的例子
- 2,分布式项目中 RPC 的调用处理时间过长的问题
为什么要使用 MQ
微服务架构后,链式调用是我们在写程序时候的一般流程,为了这完成一个整体功能会把它拆分成多
个函数(或子模块)比如模块 A 调用模块 B,模块 B 调用模块 C,模块 C 调用模块 D。但是大型分布式应用
中,系统间的 RPC 交互复杂,一个功能后面要调用上百个接口并非不可能,从单机架构过渡到分布式微服
务架构,这样的架构有没有问题呢?有
根据上面的几个问题,在设置系统时可以明确要达到的目标
1,要做到系统解耦,当新的模块进来时,可以做到代码改动最小; 能够解耦
2,设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰
3,强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步
什么是 MQ
定义
面向消息的中间件(message-oriented middleware0) MOM 能够很好的解决以上的问题。
是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储,流量削峰,异步
通信,数据同步等
大致流程
发送者把消息发给消息服务器,消息服务器把消息存放在若干队列/主题中,在合适的时候,消息服
务器会把消息转发给接受者。在这个过程中,发送和接受是异步的,也就是发送无需等待,发送者和接受者
的生命周期也没有必然关系在发布 pub/订阅 sub 模式下,也可以完成一对多的通信,可以让一个消息有多
个接受者[微信订阅号就是这样的
异步处理模式
消息发送者可以发送一个消息而无需等待响应。消息发送者把消息发送到一条虚拟的通道(主题或队
列)上;
消息接收者则订阅或监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无
需对消息发送者做出回应。整个过程都是异步的。
案例:
也就是说,一个系统和另一个系统这间进行通信的时候,假如系统 A 希望发送一个消息给系统 B,让
它去处理,但是系统 A 不关注系统 B 到底怎么处理或者有没有处理好,所以系统 A 把消息发送给 MQ,然后
就不管这条消息的“死活” 了,接着系统 B 从 MQ 里面消费出来处理即可。至于怎么处理,是否处理完毕,
什么时候处理,都是系统 B 的事,与系统 A 无关。
这样的一种通信方式,就是所谓的“异步”通信方式,对于系统 A 来说,只要把消息发给 MQ,然后系统 B
就会异步处去进行处理了,系统 A 不能“同步”的等待系统 B 处理完。这样的好处是什么呢?解耦
应用系统的解耦
发送者和接收者不必了解对方,只需要确认消息
发送者和接收者不必同时在线
现实中的业务
4.1. 安装 Docker
请执行以下命令:
u 安装 Docker:
yum -y install docker
u 开机自启:
systemctl enable docker
u 启动 Docker:
systemctl start docker
u 查看 Docker 当前的版本
docker version
至此,Docker 已经安装完成
Docker 加速仓库配置
Docker 的镜像仓库默认在国外的服务器上,导致我们拉取镜像的速度很慢,因此,我
们需要配置它的加速地址来加快镜像的拉取速度。
执行以下命令:
Ø 切换到 Docker 的配置目录里面:
cd /etc/docker/
Ø 查询当前目录所有的文件:
ls
daemon.json 就是我们要修改的配置文件。
Ø 修改 daemon.json:
vi daemon.json
按 a 键进入编辑模式:
填写以下内容:
"registry-mirrors": ["https://32xw0apq.mirror.aliyuncs.com"]
完毕后如下:
按 wq 保存退出。
Ø 配置文件修改后,重启 Docker:
systemctl restart docker
至此,Docker 已经安装配置完毕了。
安装rabbitmq用docker拉取
docker pull rabbitmq
再安装控制台界面
Docker pull rabbitmq:management
放行端口的配置
15672:图形化界面的端口
5672 :数据的端口
docker run --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=user
-e RABBITMQ_DEFAULT_PASS=123456 -d rabbitmq:management
RabbitMq名词解析
- Message消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性
- 组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode
- (指出该消息可能需要持久性存储)等。
- publisher消息的生产者,也是一个向交换器发布消息的客户端应用程序。
- exchange交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- binding绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由
- 规则,所以可以将交换器理解成一个由绑定构成的路由表。
- queue消息队列,用来保存消息到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Connection网络连接,比如一个 TCP 连接。
- channel信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP
- 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因
- 为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连
- 接。
- consumer消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
- Virtual host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服
- 务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和
- 权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
- broker表示消息队列服务器实体
web管理界面
connections:无论生产者还是消费者,都需要与 RabbitMQ 建立连接后才可以完成消息的生产和消费,
在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
Admin 用户和虚拟主机管理
添加用户
上面的 Tags 选项,其实是指定用户的角色,可选的有以下几个:
超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
监控者(monitoring)
可登陆管理控制台,同时可以查看 rabbitmq 节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
策略制定者(policymaker) 可登陆管理控制台, 同时可以对 policy 进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
创建虚拟主机
虚拟主机是什么
为了让各个用户可以互不干扰的工作,RabbitMQ 添加了虚拟主机(Virtual Hosts)的概念。其实
就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。
绑定虚拟主机和用户
创建好虚拟主机,我们还要给用户添加访问权限:
点击添加好的虚拟主机:
进入虚拟机设置界面:
创建 v-sxt 的虚拟主机和 sxt 用户并分配权限
创建 v-sxt 虚拟主机
添加成功
创建 sxt 用户
添加成功,但是现在不能访问任何虚拟主机
分配权限
AMQP 协议的回顾
RabbitMQ 支持的消息模型
SpringBoot 中 使 用RabbitMQ
在实际开发中,我们都是使用 springboot 的技术栈来完成 RabbitMQ 的开发
这个测试案例中,我们使用两个项目,一个生产者的项目,一个消费者的项目
第一种模型(直连)
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改 yml
server:
port: 8002
spring:
application:
name: producer
rabbitmq:
host: 116.62.44.5
port: 5672
username: root
password: 123456
virtual-host: /v-root
生产者消费发送
/**
* 第一种模型的消息发送
*/
@Test
void testHello(){
rabbitTemplate.convertAndSend("hello","hello world");
System.out.println("消息发成功");
}
生产者配置类 HelloConfig
@Configuration
public class HelloConfig {
/**
* 创建一个队列
*/
@Bean50
public Queue hello(){
//这里面可以和之前的 Hello 项目里面一样, 进行 5 个参数的配置
Queue hello = new Queue("hello");
return hello;
}
}
消费者配置
消费者接收消息两种方式都可以
@Component
@RabbitListener(queuesToDeclare = {@Queue("hello")})
public class HelloConsumer {
@RabbitHandler
public void receive1(String message){
System.out.println("接收到消息:"+message);
}
}
或者
@Component
public class HelloConsumer {
@RabbitListener(queuesToDeclare = {@Queue("hello")})
public void receive1(String message){
System.out.println("接收到消息:"+message);
}
}
启动测试
先使用生产者发消息,再让消费者上线,再使用生产者发消息
第二种模型(Workquene)
生产者消费发送
/**
* 第二模型的消息发送
*/
@Test
void testWork(){
for (int i = 1; i <=10 ; i++) {
rabbitTemplate.convertAndSend("work","hello work----"+i);
} S
ystem.out.println("消息发成功");
}
生产者配置类 WorkConfig
@Configuration
public class WorkConfig {
@Bean
public Queue work(){
return new Queue("work");
}
}
消费者接收消息 WorkConsumer
@Component
public class WorkConsumer {
/*
第一个消费者
*/
@RabbitListener(queuesToDeclare = {@Queue("work")})
public void receive1(String message){
System.out.println("接收到消息【1】 :"+message);
} /
*
*第二个消费者
*/
@RabbitListener(queuesToDeclare = {@Queue("work")})
public void receive2(String message){
System.out.println("接收到消息【2】 :"+message);
}
}
启动测试
先使用生产者发消息,再让消费者上线
第三种模型(fanout)广播
生产者消费发送
/**
* 第三种模型(fanout)广播
*/
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("logs","","这是日志广播");
System.out.println("消费发送成功");
}
生产者配置类 FanoutConfig
@Configuration
public class FanoutConfig {
/**
* 声明交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
FanoutExchange fanoutExchange=new FanoutExchange("logs");
return fanoutExchange;
} /
**
* 声明队列
*/
@Bean
public Queue fanoutQueue1(){
Queue queue=new Queue("fanout_queue1");
return queue;
} /
**
* 声明队列
*/
@Bean
public Queue fanoutQueue2(){
Queue queue=new Queue("fanout_queue2");
return queue;
} /
**
*把 fanout_queue1 队列绑定到交换机
*/
@Bean54
public Binding bindingQ1(){
Binding binding= BindingBuilder.bind(fanoutQueue1())
.to(fanoutExchange());
return binding;
} /
**
*把 fanout_queue2 队列绑定到交换机
*/
@Bean
public Binding bindingQ2(){
Binding binding= BindingBuilder.bind(fanoutQueue2())
.to(fanoutExchange());
return binding;
}
}
消费者接收消息 FanoutCustomer
@Component
public class FanoutCustomer {
@RabbitListener(bindings = @QueueBinding( //绑定队表和交换机
value = @Queue("fanout_queue1"),//绑定队列
exchange = @Exchange(name="logs",type = ExchangeTypes.FANOUT) //绑定交换机和声明交换机类型
))
public void receive1(String message){
System.out.println("receive1 = " + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("fanout_queue2"),55
exchange = @Exchange(name="logs",type = ExchangeTypes.FANOUT)
))
public void receive2(String message){
System.out.println("receive2 = " + message);
}
}
启动测试
先启动消费者,再让使用生产者发消息,结果是两个方法都接到到消息
第四种模型(routing-Direct)
生产者消费发送
/**
* 第四种模型(routing-Direct)
*/
@Test
public void testRoutingDirect() {
rabbitTemplate.convertAndSend("directs", "info", "info 的日志信息");
rabbitTemplate.convertAndSend("directs", "warm", "warm 的日志信息");
rabbitTemplate.convertAndSend("directs", "debug", "debug 的日志信息");
rabbitTemplate.convertAndSend("directs", "error", "error 的日志信息");
System.out.println("消息发送成功");
}
生产者配置类
* @Description:
*/
@Configuration
public class RoutingDirectConfig {
/**
* 声明交换机
*/
@Bean
public DirectExchange directExchange(){
DirectExchange directExchange=new DirectExchange("directs");
return directExchange;
} /
**
* 声明队列 1 绑定 info 和 warm
*/
@Bean
public Queue queue1(){
return new Queue("queue1");
} /
**
* 声明队列 2
*/
@Bean
public Queue queue2(){
return new Queue("queue2");
} /
**
* 把队列 1 绑定到交换机里面指定 info 的路由 key
*/
@Bean
public Binding binding11(){
Binding binding= BindingBuilder.bind(queue1())
.to(directExchange()).with("info");
return binding;
} /
**
* 把队列 1 绑定到交换机里面指定 warm 的路由 key
*/
@Bean
public Binding binding12(){57
Binding binding= BindingBuilder.bind(queue1())
.to(directExchange()).with("warm");
return binding;
} /
**
* 把队列 2 绑定到交换机里面指定 debug 的路由 key
*/
@Bean
public Binding binding21(){
Binding binding= BindingBuilder.bind(queue2())
.to(directExchange()).with("debug");
return binding;
} /
**
* 把队列 2 绑定到交换机里面指定 error 的路由 key
*/
@Bean
public Binding binding22(){
Binding binding= BindingBuilder.bind(queue2())
.to(directExchange()).with("error");
return binding;
}
}
消费者接收消息 RoutingDirectConsumer
@Component
public class RoutingDirectConsumer {
@RabbitListener(bindings ={58
@QueueBinding(
value = @Queue("queue1"),
key={"info","warm"},
exchange = @Exchange(name="directs",type = ExchangeTypes.DIRECT)
)})
public void receive1(String message){
System.out.println("消费者【1】 接收到消息: " + message);
}
@RabbitListener(bindings ={
@QueueBinding(
value =@Queue("queue2"),
key={"debug","error"},
exchange = @Exchange(name="directs",type = ExchangeTypes.DIRECT)
)})
public void receive2(String message){
System.err.println("消费者【2】 接收到消息: " + message);
}
}
启动测试
第五种模型(routing-Topic)
生产者消费发送
/**
* 第五种模型(routing-Topic)
*/
@Test59
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save","user.save 的消息");
rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findA
ll 的消息");
rabbitTemplate.convertAndSend("topics","user","user 的消息");
}
生产者配置类 RoutingTopicConfig
@Configuration
public class RoutingTopicConfig {
/**
* 声明交换机
*/
@Bean
public TopicExchange topicExchange(){
TopicExchange topicExchange=new TopicExchange("topics");
return topicExchange;
} /
**
* 声明队列 1 绑定 info 和 warm
*/
@Bean
public Queue topicQueue1(){
return new Queue("topicQueue1");
} /
**
* 声明队列 2
*/60
@Bean
public Queue topicQueue2(){
return new Queue("topicQueue2");
} /
**
* 把队列 1 绑定到交换机里面指定 user.*的路由 key
*/
@Bean
public Binding binding111(){
Binding binding= BindingBuilder.bind(topicQueue1())
.to(topicExchange()).with("user.*");
return binding;
} /
**
* 把队列 2 绑定到交换机里面指定 user.#的路由 key
*/
@Bean
public Binding binding222(){
Binding binding= BindingBuilder.bind(topicQueue2())
.to(topicExchange()).with("user.#");
return binding;
}
}
消费者接收消息 RoutingTopicConsumer
@Component
public class RoutingTopicConsumer {61
@RabbitListener(bindings ={
@QueueBinding(
value = @Queue("topicQueue1"),
key={"user.*"},
exchange = @Exchange(name="topics",type = ExchangeTypes.TOPIC)
)})
public void receive1(String message){
System.out.println("消费者 user.*【1】 接收到消息: " + message);
}
@RabbitListener(bindings ={
@QueueBinding(
value =@Queue("topicQueue2"),
key={"user.#"},
exchange = @Exchange(name="topics",type = ExchangeTypes.TOPIC)
)})
public void receive2(String message){
System.err.println("消费者 user.#【2】 接收到消息: " + message);
}
}
启动测试
boot 中发送消息的监视
概述及准备
刚才我们发送消息,不管成功还是失败,都不报错,结果看效果时,发现有的没有发进去,那么如何
知道消息是否发送成功呢,RabbitMQ 提供了一个消费监视功能
注意:RabbitMQ 发送消息分为 2 个阶段
消息发送到交互机里面,可以监视
消息由交互机到队列里面,也可以监视
修改 yaml
server:
port: 8080
spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 116.62.44.5
port: 5672
username: sxt
password: 123456
virtual-host: /v-sxt
publisher-confirm-type: correlated #开启消息到达交互机的确认机制
publisher-returns: true #消息由交互机到达队列时失败触发
消息到交互机和未到达队列里面的监视
/**
* 第三种模型的消息发送 fanout
*/
@Test
void testFanout(){
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
/**
* 当消息到达交换机之后该方法会被回调
* @param correlationData 相关的数据
* @param ack 交换机接收消息是否成功
* @param cause 如果没有成功, 返回原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("交换机接收消息成功");
}else{
System.out.println("交换机接收消息失败: 原因:"+cause);
}
}63
});
rabbitTemplate.setReturnCallback(new ReturnCallback() {
/**
* 消息如未正常到达队列里面会回调
* @param message 消息内容
* @param replyCode 回调的响应码
* @param replyText 响应文本
* @param exchange 交换机
* @param routingKey 路由 key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange,
String routingKey) {
System.out.println(message);
System.out.println("消息未正常到达队列-replyCode:"+replyCode);
System.out.println("消息未正常到达队列-replyText:"+replyText);
System.out.println("消息未正常到达队列-exchange:"+exchange);
System.out.println("消息未正常到达队列-routingKey:"+routingKey);
}
});
rabbitTemplate.convertAndSend("logs","","这是一个 fanout 的日志消息");
System.out.println("消息发送成功");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
测试
先去掉交换机的声明 队列的声明,队列和交换机的绑定【注释 FantoutConfig 里面的所有内容】
发消息测试
打开交换机的声明
发消息测试
打开队列的声明
发消息测试
打开队列和交换机绑定的声明
发消息测试
优化消息监视和到达队列的监视
@Component
public class WatchMessageImpl implements
RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
} /
**
* 消息到达交换机的回调
* @param correlationData
* @param ack 是否到达交换机
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息正常到达交换机");
}else{
System.out.println("消息没有到达交换机原因:"+cause);
}
}65
/**
* 消息由交换机到达队列失败之后的回调
* @param message 消息体
* @param replyCode 回调的状态码
* @param replyText 文本
* @param exchange 交换机的名称
* @param routingKey 路由名
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String
routingKey) {
System.out.println(new String(message.getBody())+"到达队列失败"+replyCode+" "+replyText+"
交换机为:"+exchange+" 路由 key:"+routingKey);
//处理重新发送的问题
}
}
消息转换参数的问题
SpringBoot 中消息的消费及签收机制
消息消费成功后,我们在客户端签收后,消息就从 MQ 服务器里面删除了
若消息没有消费成功,我们让他回到 MQ 里面,让别人再次重试消费
自动签收
消息只要被客户端接收到,无论你客户端发生了什么,我们服务器都不管你了,直接把你删除了,这是它是默认的行为
手动签收
创建 yml 配置文件
server:
port: 8001
spring:
application:
name: producer
rabbitmq:
host: 116.62.44.5
port: 5672
username: sxt
password: 123456
virtual-host: /v-sxt
# publisher-confirms: true 老版本里面的用法
publisher-confirm-type: correlated #开启消息到达交换机的确认机制
publisher-returns: true #消息由交互机到达队列时失败触发
listener:
simple:
acknowledge-mode: manual #手动签收
#acknowledge-mode: auto #自动签收 这个是默认行为
direct:
acknowledge-mode: manual #设置直连交互机的签收类型
消息投送的 ID 的说明【重点】
怎么获取投递的 ID【重点】
@Component
public class MessageReceive1 {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue("queue"),
key = "error",
exchange = @Exchange(value = "directs"))//默认的直连交换机
})
public void receiveMessage(String content, Message message, Channel
channel) throws IOException {
System.out.println("消费者收到消息-内容为:"+content);
System.out.println("消费者收到消息-消息对象:"+message);
System.out.println("消费者收到消息-信道:"+channel);
long deliveryTag = message.getMessageProperties().getDeliveryTag();//
消息投递 ID
System.out.println("消息投递 ID:"+deliveryTag);
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息自定义 ID:"+messageId);
/**
* 参数说明:
* deliveryTag 消息投递 ID, 要签收的投递 ID 是多少
* multiple:是否批量签收
*/
channel.basicAck(deliveryTag,false);
System.out.println("消息签收成功");
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
投递 ID 存在的问题及消息的永久 ID 设置的问题【重点】
什么能代表消息的唯一的标识,显然投送的 id 不行,因为一个消息可能会有多个投送的 id,我们就
需要给消息一个唯一的值,这个伴随消息终身,不会变化!
我们需要发生消息时,给消息设置一个 Id,然后保证该 Id 唯一就可以!
@Test
public void sendMessage() throws Exception {
for (int i = 1; i <=5; i++) {
// rabbitTemplate.convertAndSend("directs","error","我是一个日志消息
-"+i);
rabbitTemplate.convertAndSend("directs", "error", "我是一个日志消息-" +
i,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws
AmqpException {
//自己给消息设置自定义的 ID
String messageId= UUID.randomUUID().toString().replace("-","");
message.getMessageProperties().setMessageId(messageId);
return message;
}
});
} S
ystem.out.println("消息发送成功");
System.in.read();
}
13.3.6. 关于批量的签
关于批量的签收
若我们此时签收 4 了,但是前面 0,1,2,3 都没有签收,则 MQ 若是批量的签收,它会把 0,1,2,3 都签收,因为 MQ 认为,比他晚投递的已经签收,前面的肯定已经消费成功了
发送者
static int a=1;
@Test71
public void sendMessage2() throws Exception {
for (int i = 1; i <=3; i++) {
rabbitTemplate.convertAndSend("directs", "error", "ABC-" + i,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//自己给消息设置自定义的 ID
message.getMessageProperties().setMessageId((a++)+"");
return message;
}
});
} S
ystem.out.println("消息发送成功");
System.in.read();
}
消费者
@Component
public class MessageReceive2 {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue("queue"),
key = "error",
exchange = @Exchange(value = "directs"))//默认的直连交换机72
})
public void receiveMessage(String content, Message message, Channel channel) throws
IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息投递 ID
System.out.println("消息投递 ID:"+deliveryTag);
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息自定义 ID:"+messageId);
if(content.equals("ABC-3")){
/**
* 参数说明:
* deliveryTag 消息投递 ID, 要签收的投递 ID 是多少
* multiple:是否批量签收
*/
channel.basicAck(deliveryTag,true);
System.out.println("消息签收成功-内容为:"+content);
} S
ystem.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
}
效果
可以发现只签收了 ABC-3 但是队列里面没有消息了,说明前面的 12 都被批量签收了
不签收
当我们认为消息不合格时,或不是我们要的消息时,我们可以选择不签收它
生产者
@Test
public void sendMessage2() throws Exception {
rabbitTemplate.convertAndSend("directs", "error", "1234567",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//自己给消息设置自定义的 ID
String messageId= UUID.randomUUID().toString().replace("-","");
message.getMessageProperties().setMessageId(messageId);
return message;
}
});
System.out.println("消息发送成功");
System.in.read();
}
消费者
@Component
public class MessageReceive3 {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue("queue"),
key = "error",
exchange = @Exchange(value = "directs"))//默认的直连交换机
})
public void receiveMessage(String content, Message message, Channel channel) throws
IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息投递 ID
System.out.println("消息投递 ID:"+deliveryTag);
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息自定义 ID:"+messageId);
if(content.equals("123456")){//如果消息内容为 123456 就签收它
/**
* 参数说明:
* deliveryTag 消息投递 ID, 要签收的投递 ID 是多少
* multiple:是否批量签收
*/
channel.basicAck(deliveryTag,false);
System.out.println("消息签收成功");
}else{
//如果不是 123456 就拒绝签收
/**
* 参数说明:
* deliveryTag 消息投递 ID, 要签收的投递 ID 是多少
* multiple:是否批量签收
* requeue: true 代表拒绝签收并把消息重新放回到队列里面 false 就直接拒绝
*/
channel.basicNack(deliveryTag,false,true);
System.out.println("消息被拒绝签收");
}75
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
}
我们选择不签收,其实是为了保护消息,当消费消息发送异常时,我们可以把消息放在队列里面,让它重新投递,重新让别人消费!而不是丢了它!
版权归原作者 未来, 所有, 如有侵权,请联系我们删除。