RabbitMQ
RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息队列系统,基于
AMQP
协议。
AMQP协议
AMQP协议是具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP协议中的几个重要概念
- Server:接收客户端的连接,实现AMQP实体服务。
- Connection:应用程序和Server的网络连接,TCP连接。
- Channel:信道,消息读写等操作在信道中进行,客户端可以建立多个信道,每个信道代表一个会话任务。
- Message:消息,应用程序和服务器之间传递的数据,消息可以非常简单,也可以非常复杂。由Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
- Exchange:交换机,接收消息,按照路由规则将消息路由到一个或多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换机类型有direct、topic、fanout、headers四种。
- Bindings:绑定,交换机和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换机的时候,会发送一个RountingKey,用来指定路由规则,这样交换机就知道把消息发送给哪个队列。路由键通常为一个“.” 分割的字符串,例如"com.rabbitmq’。
- Queue:消息队列,用来保存消息,供消费者消费。
工作原理
AMQP协议模型由三部分组成:生产者、消费者和服务端,执行流程如下:
- 生产者连接到Server,建立一个连接,开启一个信道。
- 生产者声明交换机和队列,设置相关属性,并通过路由键将交换机和队列进行绑定。
- 消费者也需要进行建立连接,开启信道等操作,便于接收消息。
- 生产者发送消息,发送到服务端中的虚拟主机。
- 虚拟主机中的交换机根据路由键选择路由规则,发送到不同的消息队列中。
- 订阅了消息队列的消费者就可以获取到消息,进行消费。
常用交换机
RabbitMQ常用的交换机有四种,分别是direct、topic、fanout和headers。
Direct Exchange
直连交换机,该交换机绑定一个队列,要求消息与一个特定的路由键完全匹配,进行一对一、点对点的发送。
fanout Exchange
fanout交换机,一个发送到fanout交换机的消息都会被转发到与该交换机绑定的所有队列上,类似子网广播,每个子网内的主机都获得一份复制的消息,简单点说就是发布订阅。
topic Exchange
topic交换机,该种交换机使用通配符进行消息匹配,路由到对应的队列。通配符有两种:“*”、“#”。需要注意的是通配符前面必须要加上"."符号。
*
:匹配且只匹配一个词,比如a.*
可以匹配到a.b
,a.c
,但是匹配不到a.b.c
。#
:匹配一个或多个词,比如rabbit.#
既可以匹配到rabbit.a.b
,也可以匹配到rabbit.a.b.c
。
headers Exchange
headers也就是头部,该种交换机的路由不是用RoutingKey进行路由匹配的,而是根据匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息,有两种模式,全部匹配和部分匹配。交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。
RabbitMQ代码示例
Spring AMQP
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
在rabbitmq的官方文档中给出了使用Spring AMQP时的六种使用方式:
分别为:
- Hello World
- Word Queues
- Publish/Subscribe
- Routing
- Topics
- RPC
Demo 项目搭建
创建consumer服务作为消费者,创建publisher服务作为生产者
在pom文件中引入amqp的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置yml:
spring:rabbitmq:host: 175.24.180.***port:5672virtual-host: /
username: root
password:123456
简单的Hello World方式
直接创建一个消息队列,生产者和消费者通过队列直连
生产者:
packagecom.example.publisher;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestSendMessage2Queue(){String queueName ="simple.queue";String msg ="hello, rabbit";
rabbitTemplate.convertAndSend(queueName, msg);}}
消费者:
packagecom.example.consumer.listener;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMqListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String msg){System.out.println("消费者收到消息: "+ msg);}}
效果:
生产者向队列中发送消息:
运行消费者程序,消费者从队列中取消息:
Work Queues
工作队列模式,一个生产者,多个消费者,一个队列
主要思想是避免排队等待,避免一个消息处理时间过久而无法处理下一个的问题。
rabbitmq中的工作队列模式默认采用轮询的方式,如果有两个消费者,消息逐一分给每个消费者进行消费。
生产者:
生成五十条消息,放入消息队列
@TestvoidtestWorkQueue()throwsInterruptedException{String queueName ="work.queue";for(int i =1;i <=50;i++){String msg ="test work queue_"+ i;
rabbitTemplate.convertAndSend(queueName ,msg);Thread.sleep(20);}}
消费者:
创建两个消费者监听同一个队列
@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(String msg){System.out.println("消费者1收到消息: "+ msg);}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(String msg){System.out.println("消费者2收到消息: "+ msg);}
结果:
从这里也可以看出,每条消息只会被消费一次。
当两个消费者消费能力不同时(让消费者1休眠20毫秒,消费者2休眠200毫秒):
可以看出,这里没有考虑消费者的处理能力,导致处理慢的消费者最后堆积了很多消息,增长了处理时间。
解决方法:设置prefetch=1,即每次消费者预取一条消息。
效果:
拓展:如何解决消息堆积问题?
- 绑定多个消费者
- 优化业务代码,让消费者处理速度变快
Publish/Subscribe
发布订阅模式,类似于广播模式,通过Fanout交换机实现。生产者将消息发送给fanout交换机,fanout交换机将收到的消息发送给所有绑定他的队列。
这里通过控制台创建两个队列
创建一个交换机:
并进行绑定
生产者:
@TestvoidtestSendFanout(){String exchangeName ="root.fanout";String msg ="hello everyone";
rabbitTemplate.convertAndSend(exchangeName,"", msg);}
消费者:
@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1收到来自fanout queue1 的消息 "+ msg);}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2收到来自fanout queue2 的消息 "+ msg);}
效果:
Routing
路由模式,通过指定路由规则,结合Direct 交换机,我们可以将交换机接收到的消息发送给指定的Queue,称为定向路由,比Fanout要更加灵活。
每个queue都与Exchange设置一个BindingKey,发布者发送消息时,指定消息的RoutingKey,交换机将消息路由到BindingKey与消息RoutingKey一致的队列。
不同的队列可以有相同的BindingKey,
创建两个队列:
创建direct交换机并进行绑定,并指定routingkey
生产者:
@TestvoidtestSendDirect(){String exchangeName ="root.direct";String routingKey ="blue";String msg ="hello "+ routingKey;
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);}
消费者:
@RabbitListener(queues ="direct.queue1")publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1收到来自direct queue1 的消息 "+ msg);}@RabbitListener(queues ="direct.queue2")publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2收到来自direct queue2 的消息 "+ msg);}
效果:
Topics
主题模式,routingkey可以是多个单词的列表,并且以.分割。
Queue和Exchange指定Bingdingkey时可以使用通配符:
#
:指代0个或多个单词
*
:指代1个单词
生产者:
@TestvoidtestTopic(){String exchangeName ="root.topic";String routingKey ="China.weather";String msg ="今天天气不错";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);}
消费者:
@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1收到来自topic queue1 的消息 "+ msg);}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2收到来自topic queue2 的消息 "+ msg);}
效果:
Spring AMQP声明队列和交换机
使用控制面板声明队列和交换机在实际的生产环境中很不方便且容易出错,Spring AMQP提供了声明队列、交换机和绑定关系对应的方式。
配置类的方式
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
具体操作:
声明一个配置类,在该类中声明队列、交换机和绑定关系。
packagecom.example.consumer.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfiguration{@BeanpublicFanoutExchangefanoutExchange(){// ExchangeBuilder.fanoutExchange("root.fanout2").build();returnnewFanoutExchange("root.fanout2");}@BeanpublicQueuefanoutQueue3(){// QueueBuilder.durable("fanout.queue3").build();returnnewQueue("fanout.queue3");}@BeanpublicBindingfanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}@BeanpublicQueuefanoutQueue4(){returnnewQueue("fanout.queue4");}@BeanpublicBindingfanoutBinding4(){returnBindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}}
缺点:例如当direct交换机绑定多个队列时,需要写多个Bean来声明绑定关系。
使用注解声明
使用
@RabbitListener
注解
注解源码:
@Target({ElementType.TYPE,ElementType.METHOD,ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@MessageMapping@Documented@Repeatable(RabbitListeners.class)public@interfaceRabbitListener{Stringid()default"";StringcontainerFactory()default"";String[]queues()default{};Queue[]queuesToDeclare()default{};booleanexclusive()defaultfalse;Stringpriority()default"";Stringadmin()default"";QueueBinding[]bindings()default{};Stringgroup()default"";StringreturnExceptions()default"";StringerrorHandler()default"";Stringconcurrency()default"";StringautoStartup()default"";Stringexecutor()default"";StringackMode()default"";StringreplyPostProcessor()default"";StringmessageConverter()default"";StringreplyContentType()default"";StringconverterWinsContentType()default"true";Stringbatch()default"";}
示例:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="direct.queue1", durable ="true"),
exchange =@Exchange(name ="root.direct", type =ExchangeTypes.DIRECT),
key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1收到来自direct queue1 的消息 "+ msg);}
RabbitMQ特性
保证生产者可靠性–生产者重连
spring:
rabbitmq:
connection-timeout:1s # 设置MQ的连接超时时间
template:
retry:
enabled:true # 开启超时重试机制
initial-interval:1000ms # 失败后的初始等待时间
multiplier:1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts:3 # 最大重试次数
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以使用异步线程来执行发送消息的代码。
生产者确认
RabbitMQ提供了
Publisher Confirm
和
Publisher Return
两种确认机制,开启确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过
PublisherReturn
返回路由异常原因,然后返回ACK,告知投递成功。 - 临时消息投递到了MQ,并且入队成功,返回
ACK
,告知投递成功。 - 持久消息投递到了MQ,并且入队完成持久化,返回
ACK
,告知投递成功。 - 其他情况都会返回
NACK
,告知投递失败。 在yml中添加下列配置:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns:true # 开启publisher return机制
publisher-confirm-type三种取值:
none:关闭confirm机制
simple:同步阻塞等待MQ的回执消息
correlated:MQ异步回调方式返回回执消息
MQ的可靠性
在默认情况下,RabbitMQ会将接收到的消息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦MQ宕机,内存中的消息会丢失。
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞。(page out)
解决该问题的两种方式:
- 数据持久化
- Lazy Queue
Rabbit MQ的持久化
RabbitMQ
的持久化包括三个方面:
- 交换机的持久化
- 队列的持久化
- 消息的持久化
Spring在创建交换机和队列时,默认将其设置为
durable
。
消息的持久化:在控制面板中将
Delivery mode
修改为2.
Lazy Queue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储 在3.12版本后,所有队列都是Lazy Queue模式,无法更改。
RabbitMQ如何保证消息的可靠性
- 首先通过配置可以让交换机、队列以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在。
- RabbitMQ在3.6版本引入了Lazy Queue,并且在3.12版本后会作为队列的默认模式。Lazy Queue会将所有消息都持久化。
- 开启持久化和生产者确认时,RabbitMQ会将所有消息都持久化。
- 开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK确认。
消费者可靠性
通过以下手段保证消费者的可靠性:
- 消费者确认机制
- 消息失败处理
- 业务幂等性
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知MQ自己消息处理的状态。回执有三种可选值:
ack
:成功处理消息,rabbitmq从队列中删除该消息。nack
:消息处理失败,rabbitmq需要再次投递该消息。reject
:消息处理失败并拒绝该消息,rabbitmq从队列中删除该消息。
SpringAMQP已经实现了消息确认功能,并允许我们通过配置文件选择ACK处理方式,有三种方式:
- none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ中删除。非常不安全,不建议使用。
- manual:手动模式,需要自己从业务代码中调用api,发送ack或reject,存在业务入侵,但更加灵活。
- auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果: 如果是业务异常,会自动返回nack 如果是消息处理或者校验异常,自动返回reject。
消息失败处理
失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常是利用本地重试,而不是无限制的requeue到mq队列。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack,消息重新入队。RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机。
将失败处理策略改为RepublishMessageRecoverer:
首先,定义接收失败消息的交换机、队列以及绑定关系
然后,定义RepublishMessageRecoverer:
消费者如何保证消息一定被消费?
开启消费者确认机制为auto,由Spring确认消息处理成功后返回ack,异常时返回nack
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理。
业务幂等性
在实际使用时,可能由于网络波动等原因,消费者成功接收到了消息,但消息队列没有收到ack确认,触发重发机制,导致消费者处理了多次该消息。
解决:
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库中
如果下次又收到相同消息,去数据库中查询判断是否存在,存在则为重复消息放弃处理。
方案二,是结合业务逻辑,基于业务本身做判断。
延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立即收到消息,而是在指定时间后才收到消息。
死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过
dead-letter-exchange
属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(dead letter exchange,简称DLX).
延迟消息插件
RabbitMQ的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
通过注解方式定义交换机时,设置
delayed
属性为
true
:
通过bean定义时:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="delay.queue", durable ="true"),
exchange =@Exchange(name ="delay.direct", delayed ="true"),
key ="hi"))publicvoidlistenDelayQueue(String msg){System.out.println("接收到delay.queue的消息,"+ msg);}
发送消息时设置过期时间:
@TestvoidtestSendDelayMessage(){
rabbitTemplate.convertAndSend("delay.direct","hi","hello",newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setDelayLong(100L);return message;}});}
会造成系统性能的损耗,适合延迟时间较短的场景。
版权归原作者 _Sincerely 所有, 如有侵权,请联系我们删除。