一 、RabbitMQ概念
1 架构图
2 相关概念
Publisher - ⽣产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker,也就是我们的RabbitMQ服务器
Virtual host:出于多租户和安全因素设计的,在RabbitMQ中可以创建出多个虚拟消息服务器VirtualHost。
Connection:publisher/consumer 和 broker 之间的 TCP 连接
channel-信道: 网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务 , 信道有特定的功能,比如创建交换机,创建队列。
Exchange - 交换机:和⽣产者建⽴连接并接收⽣产者的消息 ,并且不能保存消息。
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进⾏交互 ,队列是可以保存消息的。
Routes - 路由:交换机以什么样的策略将消息发布到Queue。生产者发消息的时候,可以给消息贴一个标签,为了让指定的消费者接收消息。
- 结构解读:
首先安装好的RabbitMQ就是一个Broker,如果我们想将MQ给多个用户使用并且互不影响,那我们就需要将MQ通过虚拟化的方式分割成多个提供MQ的服务,也就是Virtual host,每个Virtual host都有独立的路径,并且和用户绑定。这样我们就可以在自己的世界里发消息了。
- 通信解读:一条消息到底是怎么从生产者到了消费者的?- 首先生产者通过连接的方式连接到MQ的一个虚拟机,需要知道MQ的ip,端口,虚拟机路径,用户名和密码,准备好了以后就可以建立连接了TCP 连接Connection连接,- 但是建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的,所以我们使用信道changel的方式发送和接受消息。- 消息进入MQ的第一站是Exchange交换机,交换机的作用:① 接收生产者发送的消息 ②和队列绑定。交换机是不保存信息的。生产者发消息的时候可以指定一个路由键,路由键可以理解为就是给消息贴了一个标签(做标记作用,消费者接收消息的时候有用)- 消息进入第二站queue,消费者要接收消息,需要一直监听着queue,那么消费者在监听queue的时候需要先指定队列要和那个交换机绑定,绑定的时候也需要指定路由键,如果发消息时的路由键和接收消息时候路由键一样,那么这个消息就会进入到这个队列。- 最后消费者就拿到消息了。需要说明的一点,所有的交换机和队列创建的时候都是需要起名字的。
3 RabbitMQ的通讯
官网介绍:RabbitMQ Tutorials — RabbitMQ
二 案例解释
新建maven工程,Spring整合MQ。因为MQ中有很多概念在boot中是体会不到的,boot屏蔽了很多概念。
1 简单队列模式
这是RabbitMQ最简单的工作方式
- 生产者声明好队列,然后把信息给了MQ默认的交换机,交换机将信息发给队列
- 消费者也声明好队列,然后监听队列获取信息
2 work queue
队列模式: 能者多劳模式
在简单模式的基础上添加了多个消费者,每个消费者添加了等待时间。
生产者一次往队列里投放多条消息,消费者根据能力来消费这里面的所有消息,性能强的消费的消息多,所以是能者多劳
3 订阅发布
平分秋色
交换机类型 fanout
发布订阅,这次使用了交换机,之前的两种方式都是没有显式的声明使用交换机,之前其实用的系统默认的交换机。
这次使用了交换机,但是 没有使用路由键。只要和交换机绑定了的对了都可以接受到消息,也就是上图两个队列中可以收到相同的消息。
4 .路由 routing
暗送秋波
交换机direct
- 在⽣产者发送消息时指明routing-key
- 在消费者声明队列和交换机的绑定关系时,指明routing-key
- 解决的问题是:- 因为交换机和两个队列都绑定了,但是为了给队列里发送的消息不一样,也就是区分给那个队列发什么样 的消息,就有了routing key的概念。发消息的时候指定一下路由键,接收消息的时候队列要和交换机绑定,这时候也需要指定路由键,如果这两次的路由键一样,那么这个消息就放着这个队列里面
5 通配符模式
你的心思我要
交换机是 topic
- 因为路由模式里是精确匹配,比较局限,使用通配符方式,通配符,提⾼了匹配的范围,扩展业务。
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
- 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert。
三 消息的可靠性投递
1 什么是消息的可靠性投递
- 保证消息一定能发到消息队列中
- 细节- 保证mq节点成功接受消息- 消息发送端需要接受到mq服务端接收到消息的确认应答- 完善的消息补偿机制,发送失败的消息可以再感知并二次处理
- RabbitMQ消息投递路径- 生产者-->交换机-->队列-->消费者- 通过两个点的控制,保证消息的可靠性投递- 生产者到交换机 confirmCallback- 交换机到队列 returnCallbakc
- 建议- 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,RabbitMQ的整体效率变低,吞吐量下降严重,不是非常重要的消息不建议用消息确认机制
2 confirmCallback
- 机制:生产者投递消息以后,如果Broker收到消息以后,会给生产者一个ACK,生产者通过ACK可以确认这条消息是否成功发送到Broker。
- 开启confirmCallbackspring.rabbitmq.publisher-confirm-type: correlated
- 发送代码
@Test
void confirm(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 消息到交换机的确认
* @param correlationData 配置信息
* @param ack 交换机确认 true消息接受成功 false消息接受失败
* @param cause 消息发送失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback==========>");
System.out.println("correlationData==========>"+correlationData);
System.out.println("ack==========>"+ack);
System.out.println("cause==========>"+cause);
if(ack){
System.out.println("发送成功");
// 更新数据库 成功
}else {
System.out.println("发送失败,日志或数据库纪录");
// 更新数据库 失败
}
}
});
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"xinzhi.15","老韩学it");
}
- 模拟失败场景,修改发送时候交换机名称
2 returnCallback
return机制保证消息在rabbitmq中能够成功的投递到队列⾥
两种模式:- 交换机到队列不成功,则丢弃消息(默认)- 交换机到队列不成功,返回生产者,触发returnCallback
开启returnCallback,交换机到队列的可靠性投递spring.rabbitmq.publisher-returns=true
修改投递到队列失败的策略spring.rabbitmq.template.mandatory=true
发送消息验证.>
> @Test> void returnCallback(){> rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {> @Override> public void returnedMessage(ReturnedMessage returned) {> int code = returned.getReplyCode();> System.out.println("code==>"+code);> System.out.println("returned==>"+returned);> }> });> rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME," "," ");> }> >
发送消息以后,没有任何提示,我们修改路由键
四 消息确认
1 背景
保证消息从队列到消费者的过程。
2 ACK介绍
- 消费者从RabbitMQ中获取消息并且处理完成以后,反馈给RabbitMQ,RabbitMQ收到确认消息以后才能把消息从队列中删除
- 消费者在处理消息的时候出现了网络不稳定、服务器异常等情况,那么就不会有ACK反馈,RabbitMQ认为这个消息没有正常消费,就将这个消息放回队列里面
- 只有当消费者正确发送ack以后,RabbitMQ才会把消息从队列中删除
- 消息的ack确认机制默认是打开的,消息如果未被进行ack的消息确认机制,这条消息将被锁定
3 确认方式
自动
手动manualspring.rabbitmq.listener.simple.acknowledge-mode=manual
发送消息,并且开启监听模式,虽然消息被消费了,但是因为开启了手动确认模式配置,但是代码里没有手动确认所以队列里的消息不会删除
代码中开启确认机制
channel.basicAck(msgTag,false);
- 消息拒绝
// false 一次拒绝一条 true 重新回到队列
channel.basicNack(msgTag,false,true);
结果就会看到控制台一直接受消息,因为对列有消息就会被监听到,监听以后拒绝了又放到队列里面,然后 又监听...
- DeliveryTag表示消息投递的序号,每次消费消息或者消息重新投递以后,DeliveryTag都会+1
- basicReject也是消息拒绝的,一次只能拒绝一条消息,也可以设置是否重新回如队列
版权归原作者 夨落旳尐孩649 所有, 如有侵权,请联系我们删除。