一、简介
RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的可复用的企业消息系统。它是使用Erlang编写的,用于大型软件系统各个模块之间的高效通信,支持高并发、支持可扩展、支持多种客户端,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都有很好的性能。特别的,它实现了一个Broker框架,这意味着消息在发送给客户端时先在中心队列排队,对路由、负载均衡或者数据持久化都有很好的支持。具体的特点介绍如下:
**可靠性**:使用一些机制来保证可靠性,如持久化、传输确认、发布确认等。
**灵活的路由**:在消息进入队列之前,通过交换机来路由消息。典型的路由功能由内置交换机来实现,复杂的路由功能,可以将多个交换机绑定在一起,也可以通过插件机制来实现自己的交换机。
**扩展性:**多个RabbitMQ节点可以组成一个集群,动态扩展集群节点。
**高可用性:**队列可以在集群中的机器上设置镜像,保证高可用性。
二、模型架构
1. 整体模型架构
生产者(Producer)、消费者(Consumer)、交换机(Exchange)、队列(Queue)
消息一般有两部分组成:消息头和消息体。消息体也可以称为payLoad,消息体是不透明的,而消息头则由一系列的可选属性组成,包括:routing-key、priority、delivery-mode(指出该消息可能需要持久性存储)等。
2.交换机
** Exchange(交换器)** 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。RabbitMQ的交换机有4种类型,不同的类型对应不同的路由策略:
direct(默认)、fanout、topic、headers。
生产者将消息发送给交换机的时候,一般会指定一个**RoutingKey(路由键)**,用来指定这个消息的路由规则,这个RoutingKey需要与交换机类型**BindingKey(绑定键)**联合使用才能生效。
RabbitMQ 中通过 **Binding(绑定)** 将 **Exchange(交换器)** 与 **Queue(消息队列)** 关联起来,在绑定的时候一般会指定一个 **BindingKey(绑定建)** ,当生产者发送消息的RoutingKey和BindingKey匹配时, RabbitMQ 就能正确将消息路由到队列了。
(1)Fanout
Fanout交换机会将接收到的消息路由到每一个跟其绑定的队列中,称为广播模式。是所有交换机类型里面速度最快的,不需要指定RoutingKey。实现思路如下:
(2)Direct
direct类型的交换机会把消息路由到Bindingkey和RoutingKey完全匹配的Queue中。具体的实现思路如下:
①每一个Queue都与Exchange设置一个BindingKey,多个队列可以设置同一个BindingKey,一个队列可以绑定多个key。
②发布者发送消息时,指定消息的RoutingKey。
③Exchange将消息路由到BindingKey与消息RoutingKey一致的队列。
direct类型常用于处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理优先级的队列。
(3)Topic
Topic交换机也是基于RoutingKey做消息路由的,但是RoutingKey通常是由多个单词的组合,并且以“·”分隔。Queue与Exchange指定BindingKey时,可以使用通配符,可以做模糊匹配,扩展性更强,其中,“*”用于匹配一个单词,“#”用于匹配0个或者多个单词。
(4)headers(不推荐)
headers类型的交换机不依赖于路由键的匹配规则来路由消息,而是根据发送内容中的headers属性进行匹配。在绑定队列和交换机时候指定一组键值对,当发送消息到交换机时候,RabbitMQ会获取到该信息的headers,如果完全匹配队列绑定的键值对才会正确路由。headers交换机的性能很差,不推荐使用。
3.消息队列(Queue)
消息队列是存储消息的容器。一个消息可以投入一个或者多个队列。
RabbitMQ中消息只能存储在队列中,这一点与Kafka相反。Kafka将消息存储在topic这个逻辑层面,而相应的队列逻辑只是topic实际存储文件中的位移标识。
多个消费者可以订阅同一个队列,这是队列中的消息会被平均分摊(轮询)给多个消费者进行处理,避免消息被重复消费。RabbitMQ不支持队列层面的广播消费。
4.Broker
RabbitMQ的Broker包括交换机和队列两部分,可以看作一个RabbitMQ服务节点/实例/服务器。以下是RabbitMQ消息队列的运转过程。![](https://i-blog.csdnimg.cn/direct/e161ca699cd945b2bbe677e0c59c59bc.png)
三、实践
1.快速入门
(1)安装部署(Docker)
docker run \
-e RABBITMQ_DEFAULT_USER=ray\
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network net0\
-d rabbitmq:3.8-management
登录http://192.168.19.128:15672(这个是自己的ip)进行RabbitMQ管理。
(2)快速入门
publisher: 消息发送者 queue:队列,存储信息
consumer:消息的消费者 exchange:交换机,负责路由消息
为了实现不同项目交换机和队列的隔离效果,引入virtual-host虚拟主机。
消息发送的注意事项:①队列必须与交换机进行绑定,才能正确的进行消息路由发送与转发。
②交换机只能路由消息,无法存储消息。
(3)数据隔离
在RabbitMQ的控制台完成下列操作:①新建一个用户hmall ②为hmall用户创建一个virtual host ③测试不同virtual host之间的数据隔离现象。
2.Java客户端
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包括两部分,spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
(1)基本步骤
引入spring-amqp依赖 ![](https://i-blog.csdnimg.cn/direct/6f8a536784694cf79379050997858176.png)
配置RabbitMQ服务端消息
引入RabbitTemplate进行消息发送
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
String queueName="simple.queue";
String message="hello,amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
消费者
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message){
log.info("监听到simple.queue的消息:[{}]",message);
}
}
(2)Work Queues
多消费示例
Work Queues 任务模型。让多个消费者监听同一个队列。解决消息堆积的问题,提升消息处理速率,进行负载均衡,默认情况下采用轮询的方式分配。基本思路如下:
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
log.info("消费者1-监听到work.queue的消息:[{}]",message+LocalDate.now());
//通过与配置文件中prefetch进行配合,实现每秒处理40条消息的处理速率
Thread.sleep(25);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
log.info("消费者2-监听到work.queue的消息:[{}]",message+LocalDate.now());
//通过与配置文件中prefetch进行配合,实现每秒处理10条消息的处理速率
Thread.sleep(200);
}
消费者消息推送限制
修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息,每次只能获取一条消息,处理完成才能获取下一个消息,可以按需分配了。
3.Java中声明交换机
(1)基于Bean声明队列交换机
Queue用于声明队列,可以用工厂类 QueueBuilder构建、
Exchange用于声明交换机,可以用工厂类ExchangeBuilder构建
Binding用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
@Configuration
public class DirectConfiguration {
//声明交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("hmall.direct");
}
//声明队列
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
//声明队列
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
//声明绑定关系,每次只能声明一个routingKey,麻烦
@Bean
public Binding directQueue1BindingRed(Queue directQueue1,DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
@Bean
public Binding directQueue1BindingBlue(Queue directQueue1,DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("yellow");
}
@Bean
public Binding directQueue2Binding(Queue directQueue2,DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("blue");
}
}
(2)基于注解声明队列交换机
@Slf4j
@Component
public class SpringRabbitListener {
//一个注解同时完成了队列、交换机、绑定关系的声明
@RabbitListener(bindings = @QueueBinding(
value=@Queue(name="direct.queue1"),
exchange=@Exchange(name="hmall.direct",type= ExchangeTypes.DIRECT),
key={"red","blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到Direct消息:"+msg);
}
}
4. 消息转换器
(1)JDK序列化
convertAndSend该函数可以将Object对象自动序列化之后发送给消息队列,这里在底层的实现是基于jdk的序列化,但是这种序列化的弊端在于:①jdk序列化有安全风险,容易被代码注入 ②jdk序列化的消息太大③jdk序列化之后的可读性差。
(2)JSON序列化
在producer和consumer中要引入jackson依赖
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
在producer和consumer中配置MessageConverter
@Configuration
public class MessageConverterConfiguration {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
消息发送
@Test
public void testObjectQueue(){
//1.准备消息
Map<String,Object> msg=new HashMap<>(2);
msg.put("name","jack");
msg.put("age",2);
//2.发送消息
rabbitTemplate.convertAndSend("object.queue",msg);
}
四、可靠性保障
1.发送者可靠性
(1)发送者重连
通过配置,开启retry的enabled: true开启重连机制
这种重连机制是阻塞式的重试,如果对业务性能要求高,建议禁止启动重连机制。也可以重试采用异步的方式进行代码的执行。
(2)发送者确认
①开启确认机制之后,当消息发送到MQ后,MQ会返回确认结果给发送者进行反馈。
(消息投递到MQ,但是路由失败,会返回路由异常原因,然后返回ACK) ![](https://i-blog.csdnimg.cn/direct/7013de94276a4ac3b461be3b89445db6.png)
②开启步骤
** 在application.yml中添加配置**
publisher-confirm-type的三种类型:none(关闭confirm机制),simple(同步阻塞),correlated(MQ异步回调方式返回回执消息),一般使用correlated。
③配置一个ReturnCallback(项目启动过程中进行配置),每次发送消息配置ConfirmCallback。(该部分内容一般情况下不会使用到)
![](https://i-blog.csdnimg.cn/direct/071a259ed8b84e528e5ca46d13ae9e8a.png)
该部分要设置休眠时间让其有足够的时间来回调,但是要设置日志级别为debug级别。
2.MQ可靠性
消息持久化和LazyQueue 改变消息持久化的模式,来提升MQ的消息处理速率。
(1)数据持久化
交换机持久化:默认开启
** 队列持久化:默认开启**
** 消息持久化**
在发送消息时候,选择消息的发送模式为 Persistent。 目的:为了防止消息太多之后需要pageout时mq阻塞的情况。 消息持久化和非持久化过程中,MQ处理速度的演示,发送100w
非持久化,在内存不足,paged out的时候,mq处于阻塞状态,消息处理速率为0;
持久化,每次发消息都会做持久化,处理消息的峰值一直很高。没有page out的过程。
(2)Lazy Queue
因为消息持久化导致在内存和磁盘中双重写入导致处理每一条消息的耗时增加。
特点: ①接收到消息后直接存入磁盘,不再存储到内存
②消息读取的优化:a.消费者要消费消息时才会从磁盘中读取并加载到内存 b.同时会监测消费消息的速率,提前缓存部分消息到内存,最多2048条消息。 ③在3.12,所有的队列都是LazyQueue模式
LazyQueue 手动队列设置
(1)控制台添加
(2)代码添加
(3)演示——直接PageOut
lazy-queue的设置显著增加了mq的处理速率,一旦设置了确认机制和持久化,在整个消息持久化结束之后才会进行ack的返回。
3.消费者可靠性
(1)消费者确认机制
消费者在消息处理之后会返回给RabbitMQ做确认,有以下三种方式:
处理成功 ack
处理失败(业务异常)nack
处理失败并且不想再次接收 reject(这种情况下可能是消息本身的内容出现了问题,消息处理或者校验异常,可以这样返回reject)
开启消费者确认机制的方式:直接在yml文件里面设置acknowledge为auto:
![](https://i-blog.csdnimg.cn/direct/f9ddcad5d38949248d47a90a059cf5e2.png)
(2)失败重试机制
在消费者出现异常时利用本地重试(也就是在消费者这里进行重试,不会把消息返回来),而不是无限的入队出队(回复nack就会出现这种情况)
** 设置重试机制**:
失败消息处理策略
失败重试次数达到上限后,默认消息队列并不会再重新发送消息到消费者,消息删除。
MessageRecoverer接口用于处理消息在消费过程中可能出现的异常情况,如消息被拒绝或消费失败。它包括三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
- RepublishMessageRecover: 重试耗尽后,将失败消息投递到指定的交换机
改变默认的策略(声明bean的方式进行处理
(3)业务幂等性
- 问题阐述:当消费者发送ack给MQ不成功,可能会导致重复消费问题。所谓幂等性就是在程序开发中,同一个业务,执行一次或者多次效果对业务状态影响是一致的。
- 解决方案:
方法一:给每个消息都设置一个唯一id,利用id区分是否是重复消费:
①每一条消息都生成一个唯一的id,与消息一起投递给消费者
②消费者在消费了该消息后,会将信息的id保存到自己的数据库中(用Message对象进行接收,可以获得消息的id)
③如果下次又收到相同的消息,会根据数据库中的查询结果来判断是否是重复消息。
方法二:业务判断
结合业务逻辑,基于业务本身做判断,如果业务状态已经是消息处理后的状态,比如说已经标记为已支付状态了,那么不执行操作,说明是重复消息。以订单支付为例,加入了以下的逻辑。
(4)问题:如何保证支付服务与交易服务之间的订单状态一致性?
首先,我们是采用MQ异步通知的方式在用户支付完成后,进行交易服务订单状态更新的。
其次,为了增强MQ的可靠性,我们采用了生产者确认机制,消费者确认机制,消费者重试机制等策略,确保消息投递和处理的可靠性,同时还开启了消息持久化,避免因服务宕机导致消息丢失。
最后,我们还在交易服务更新订单状态时候做了业务幂等性判断,避免消息的重复消费问题。
五、延迟消息
延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。(比如,在用户下单后,我们的订单状态是未支付,但是由于某种原因,我们无法获得支付服务发过来的通知消息让我们来修改订单的状态,此时,我们就可以采用延迟消息进行检查,在下单后发送消息到MQ,延时时间到之后才会发送消息给消费者。)
(1)死信交换机
- 死信,以下三种情况被称之为死信:①消费者使用reject或者nack声明消费失败,并且消息的requeue参数设置为false。②消息是一个过期消息(达到了队列或者消息本身设置的过期时间),无人消费。③要投递的队列消息堆积满了,最早的消息可能会被设置成为死信。
- 原理:上述情况二,故意设置一个无人消费的队列,并指定它的死信交换机。
操作步骤:
①设置普通的队列和交换机
②设置死信队列和交换机——这个是包含消费者的
③发送消息,并设置过期时间(SetExpiration)
(2)延迟消息插件——DelayExchange插件
这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
大致步骤为:①下载插件②将插件挂载到对应的数据卷目录,安装插件并启动。③交换机设置部分,把delayed()属性改为true即可。
④发送消息出的设置(和死信交换机的设置类似,但是这个是setDelay())
版权归原作者 Rayiiel 所有, 如有侵权,请联系我们删除。