一、RabbitMQ的构造:
1、构造
RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
(1)生产者Publisher:生产消息,就是投递消息的一方。消息一般包含两个部分:消息体(payload)和标签(Label)
(2)消费者Consumer:消费消息,也就是接收消息的一方。消费者连接到RabbitMQ服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
(3)Broker服务节点:表示消息队列服务器实体。一般情况下一个Broker可以看做一个RabbitMQ服务器。
(4)Queue:消息队列,用来存放消息。一个消息可投入一个或多个队列,多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
(5)Exchange:交换器,接受生产者发送的消息,根据路由键将消息路由到绑定的队列上。
(6)Routing Key: 路由关键字,用于指定这个消息的路由规则,需要与交换器类型和绑定键(Binding Key)联合使用才能最终生效。
(7)Binding:绑定,通过绑定将交换器和队列关联起来,一般会指定一个BindingKey,通过BindingKey,交换器就知道将消息路由给哪个队列了。
(8)Connection :网络连接,比如一个TCP连接,用于连接到具体broker
(9)Channel: 信道,AMQP 命令都是在信道中进行的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接,一个TCP连接可以用多个信道。客户端可以建立多个channel,每个channel表示一个会话任务。
(10)Message:消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
(11)Virtual host:虚拟主机,用于逻辑隔离,表示一批独立的交换器、消息队列和相关对象。一个Virtual host可以有若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段
2、Exchange交换器的类型:
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers
(1)direct:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
(2)fanout:把所有发送到fanout交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
(3)topic:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
匹配规则:
- ① RoutingKey 和 BindingKey 为一个 点号 '.' 分隔的字符串。 比如: java.xiaoka.show
- ② BindingKey可使用 * 和 # 用于做模糊匹配:*匹配一个单词,#匹配多个或者0个单词
(4)headers:不依赖于路由键进行匹配,是根据发送消息内容中的headers属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
二、过程
1、生产者消息的过程:
(1)Producer 先连接到 Broker,建立连接 Connection,开启一个信道 channel
(2)Producer 声明一个交换器并设置好相关属性
(3)Producer 声明一个队列并设置好相关属性
(4)Producer 通过绑定键将交换器和队列绑定起来
(5)Producer 发送消息到 Broker,其中包含路由键、交换器等信息
(6)交换器根据接收到的路由键查找匹配的队列
(7)如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
(8)关闭信道
2、消费者接收消息过程:
(1)Producer 先连接到 Broker,建立连接 Connection,开启一个信道 channel
(2)向 Broker 请求消费相应队列中消息,可能会设置响应的回调函数。
(3)等待 Broker 回应并投递相应队列中的消息,接收消息。
(4)消费者确认收到的消息,ack。
(5)RabbitMQ从队列中删除已经确定的消息。
(6)关闭信道
3、消息重复消费问题
正常情况下,消费者在消费消息后,会给消息队列发送一个确认,消息队列接收后就知道消息已经被成功消费了,然后就从队列中删除该消息,也就不会将该消息再发送给其他消费者了。不同消息队列发出的确认消息形式不同,RabbitMQ是通过发送一个ACK确认消息。但是因为网络故障,消费者发出的确认并没有传到消息队列,导致消息队列不知道该消息已经被消费,然后就再次消息发送给了其他消费者,从而造成重复消费的情况。
重复消费问题的解决思路是:保证消息的唯一性,即使多次传输,也不让消息的多次消费带来影响,也就是保证消息等幂性;幂等性指一个操作执行任意多次所产生的影响均与一次执行的影响相同。具体解决方案如下:
(1)改造业务逻辑,使得在重复消费时也不影响最终的结果。例如对SQL语句: update t1 set money = 150 where id = 1 and money = 100; 做了个前置条件判断,即 money = 100 的情况下才会做更新,更通用的是做个 version 即版本号控制,对比消息中的版本号和数据库中的版本号。
(2)基于数据库的的唯一主键进行约束。消费完消息之后,到数据库中做一个 insert 操作,如果出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
(3)通过记录关键的key,当重复消息过来时,先判断下这个key是否已经被处理过了,如果没处理再进行下一步。
① 通过数据库:比如处理订单时,记录订单ID,在消费前,去数据库中进行查询该记录是否存在,如果存在则直接返回。
② 使用全局唯一ID,再配合第三组主键做消费记录,比如使用 redis 的 set 结构,生产者发送消息时给消息分配一个全局ID,在每次消费者开始消费前,先去redis中查询有没有消费记录,如果消费过则不进行处理,如果没消费过,则进行处理,消费完之后,就将这个ID以k-v的形式存入redis中(过期时间根据具体情况设置)。
三、可靠性传输
对于消息的可靠性传输,每种MQ都要从三个角度来分析:生产者丢数据、消息队列丢数据、消费者丢数据。在RabbitMQ中:
1、生产者丢数据:
RabbitMQ提供事务机制(transaction)和确认机制(confirm)两种模式来确保生产者不丢消息。
1.1、事务机制
送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())
该方式的缺点是生产者发送消息会同步阻塞等待发送结果是成功还是失败,导致生产者发送消息的吞吐量降下降。
// 开启事务
channel.txSelect
try {
// 发送消息
} catch(Exception e){
// 回滚事务
channel.txRollback;
//再次重试发送这条消息
....
}
//提交事务
channel.txCommit;
1.2、确认机制:
生产环境常用的是confirm模式。生产者将信道 channel 设置成 confirm 模式,一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个确认给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确到达目的队列了。如果rabbitMQ没能处理该消息,也会发送一个Nack消息给你,这时就可以进行重试操作。
Confirm模式最大的好处在于它是异步的,一旦发布消息,生产者就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息。
处理Ack和Nack的代码如下所示:
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
}
});
2、消息队列丢数据:
处理消息队列丢数据的情况,一般是开启持久化磁盘。持久化配置可以和生产者的 confirm 机制配合使用,在消息持久化磁盘后,再给生产者发送一个Ack信号。这样的话,如果消息持久化磁盘之前,即使 RabbitMQ 挂掉了,生产者也会因为收不到Ack信号而再次重发消息。
持久化设置如下(必须同时设置以下 2 个配置):
(1)创建queue的时候,将queue的持久化标志durable在设置为true,代表是一个持久的队列,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化queue里的数据;
(2)发送消息的时候将 deliveryMode 设置为 2,将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
这样设置以后,RabbitMQ 就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
3、消费者丢数据:
消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费中者会自动发送一个确认,通知 RabbitMQ 已经收到消息了,这时 RabbitMQ 就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。
解决方案就是采用手动确认消息,设置 autoAck = False,等到消息被真正消费之后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那 RabbitMQ 就收不到发的ack,然后 RabbitMQ 就会将这条消息重新分配给其他的消费者去处理。
但是 RabbitMQ 并没有使用超时机制,RabbitMQ 仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ 会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:
- 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
- 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息
需要注意的点:
1、消息可靠性增强了,性能就下降了,因为写磁盘比写 RAM 慢的多,两者的吞吐量可能有 10 倍的差距。所以,是否要对消息进行持久化,需要综合考虑业务场景、性能需要,以及可能遇到的问题。若想达到单RabbitMQ服务器 10W 条/秒以上的消息吞吐量,则要么使用其他的方式来确保消息的可靠传输,要么使用非常快速的存储系统以支持全持久化,例如使用 SSD。或者仅对关键消息作持久化处理,且应该保证关键消息的量不会导致性能瓶颈。
2、当设置 autoAck = False 时,如果忘记手动 ack,那么将会导致大量任务都处于 Unacked 状态,造成队列堆积,直至消费者断开才会重新回到队列。解决方法是及时 ack,确保异常时 ack 或者拒绝消息。
3、启用消息拒绝或者发送 nack 后导致死循环的问题:如果在消息处理异常时,直接拒绝消息,消息会重新进入队列。这时候如果消息再次被处理时又被拒绝 。这样就会形成死循环。
版权归原作者 w_t_y_y 所有, 如有侵权,请联系我们删除。