前言:大家好,我是小熊,25届毕业生,目前在一家满意的公司实习。本篇文章为RabbitMQ面试题,小编觉得网上的说的略有不全,并且有的地方很难懂,便写下了这篇文章,可能小编的文章本身刚开始写的也不是很全,但文章会一直更新的。
🧑个人简介:大家好,我是小熊,一个想吃鱼的男人😉😉
目前状况🎉:25届毕业生,在一家满意的公司实习👏👏💕欢迎大家:这里是CSDN,我用来快速回顾知识准备面试的地方,欢迎来到我的博客😘
1.如何保证消息不丢失(消息至少消费一次 or 可靠传输)
保证消息的可靠性要从三个地方出发分别是生产者,mq,消费者
消息怎么丢失的
第一种:生产者传递给mq的过程中因网络原因丢失 第二种:MQ还没有持久化自己挂了
第三种:消费端刚消费到,还没处理,结果进程挂了,比如重启了。
生产者
生产者保证消息不丢失可以从以下几点出发
重连mq
要想保证成功连接mq可以开启重连机制:
spring:
rabbitmq:
connection-timeout: 1s #设置MQ的连接超时时间
template:
retry:
enabled: true#开启超时重试机制
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1#失败后下次的等待时长倍数,下次等待时长=initial-interval*multiplier
max-attempts: 3#最大重试次数
这种连接可以设置重连次数,并且每次重连的间隔会以2倍的形式增长,倍数,以及时间都可以自定义。但是这种是阻塞式的,耗费性能
事务回滚
可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。
// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
channel 可以来自Connection 也就是我们连接mq时配置的连接得到,也可以使用spring整合的RabbitTemplate.exute()在里面基于lambda表达式获取channel对象。
public Channel getChannel() {
return rabbitTemplate.execute(channel -> {
// 返回Channel对象
return channel;
});
}
缺点:RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。
确认机制
SpringAMQP中生产者消息确认的几种返回值情况:
消息投递到了MQ,但是路由失败。会return路由异常原因,返回ACK
临时消息投递到了MQ,并且入队成功,返回ACK
持久消息投递到了MQ,并且入队完成持久化,返回ACK
其它情况都会返回NACK,告知投递失败 确认机制又分为以下两种
return机制
一般是因为路由原因而失败进行的确认的机制,一般都是程序员编码问题,可以不开启
1.配置yaml
spring:
rabbitmq:
publisher-returns: true #开启publisher return机制
2.编写,这里有个点是下面注释
//每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
//因为调用的是ioc容器么,必须等待spring初始化完成之后才能用,所以是通知实现了aware接口的实现类
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware{
@Override
public void setApplicationContext(ApplicationContext applicationContext)throws BeansException
//获RabbitTemplate
RabbitTemplaterabbitTemplate applicationContext.getBean(RabbitTemplate.class);
//配置回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
@override
public void returnedMessage(ReturnedMessage returned){
log.debug("收到消息的return callback,exchange:{},key:{},msg:I},code:{},text:{}",
returned.getExchange(),returned.getRoutingKey(),returned.getMessage(),
returned.getReplyCode(),returned.getReplyText());
});
confirm机制(可以指定同步还是异步)
首先我们需要知道confirm机制是基于回调函数的,这个过程需要网络请求,那么在编写发送函数的时候就会有所不同
1.配置yaml
spring:
rabbitmq:
publisher-confirm-type: correlated
#开肩publisher confirm机制,并设置confirm类型
#none 关闭confirm机制,
#simple 同步,
#correlated 异步
2.直接编写发送代码
//correlationData
CorrelationData cd = new CorrelationData(); //消息id
//2.给future,添加ConfirmCallback 回调函数开启异步后,结果到了后回调
//关于future 异步执行,先提前返回一个结果,等到该方法执行完之后再到future中去取
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>(){
@override
public void onFailure(Throwable ex){
//Future.发生future内部异常时的处理逻辑,基本不会触发
log.error("handle message ack fail",ex);
}
//接受成功的逻辑。
@Override
public void onSuccess(CorrelationData.Confirm result){
//2,2,Future.接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){//result.isAck(),boolean.类型,true代表ack回执,false代表nack回执
log.debug("发送消息成功,收到ack!");
}else{
//result.getReason(),String类型,返回nack时的异常描述 这里是要重发的
//记得设置重发次数
log.error("发送消息失败,收到nack,reason:{}",result,getReason())};
);
}
}
注意:里面的失败情况是要重发消息的,这里就没有写。
MQ(持久化,集群)
mq是基于内存上操作的因此在宕机(持久化,集群),或者内存不足(消息堆积,消息堆积也是面试常考,请看后续文章)的时候就会导致消息丢失
说三点:
(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;
(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式
(3)如果硬盘坏掉怎么保证消息不丢失
持久化
RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。
所以就要对消息进行持久化处理。如何持久化,下面具体说明下:
要想做到消息持久化,必须满足以下三个条件,缺一不可。
1) Exchange 设置持久化
2)Queue 设置持久化 一般使用lazy queue模式,
3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
集群
这里为什么提集群的,单节点下开启持久化确实能保证消息不丢失,但是我们要保证高性能,高可用就得使用集群,在集群条件下仍会出现消息丢失的情况。
我们先来介绍下RabbitMQ三种集群模式:可以看这篇文章非常的好
也就是说只有主节点有数据,我在给从节点同步真正的数据时该节点挂掉了。消息不就又丢失了
如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。
下面介绍下三种HA策略模式:
1)同步至所有的
2)同步最多N个机器
3)只同步至符合指定名称的nodes
但是:HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降
(3)消息补偿机制
为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,
但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。
比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?
1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。
2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。
消费者(开启手动 ack确认机制)
ACK确认机制
分为自动ack 和手动ack
首先默认情况下消费者端是自动ack,接受到消息就回复ack了,因此我们得让他处理成功后才回复ack,也就是手动ack.这种方式是需要编写代码的。//todo 下次一定,二次元劳累抬头
怎么关闭呢
示例2个方法
配置类
public SimpleMessageListenerContainer container(RabbitTemplate rabbitTemplate, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());
container.setQueueNames("exampleQueue");
container.setMessageListener(listenerAdapter);
// 设置为手动确认
container.setAutoAck(false);
return container;
}
yaml
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: none
#模式有none 无论是什么返回ack
#manual 手动调用api 返回ack或者reject
#auto springamq管理根据异常的类型返回不同的结果
重试机制
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true#开启消费者失败重试
initial-interval: 1000ms #物始的失败等待时长为1秒
multiplier: 1#下次失败的等待时长倍数,下次等待时长=multiplier*last-interval
max-attempts: 3#最大重试次数
stateless: true#true无状态,:false有状态。如果业务中包含事务,这里改为False
retry: true #并非重新入mq,而是本地retry
retry开启后的retry策略:MEssageRecover接口
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式 ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队 RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机 --->这种方式需要在创建一个队列 定义为失败队列
//在原先的配置类基础上加上
@Bean
public MessageRecoverer republishMessageRecoverer(
RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}}
面试得时候按照目录去说即可。
2.如何保证消费仅消费一次
前面讲了如何让消息至少消费一次,保证了消息一定会被消费,接下来说说如何保证消息仅消费一次。 首先我们得了解这个问题得本质是为了保证消息得幂等性。什么是幂等性呢在业务里就是,同一条业务执行多次结果不变。比如:我要删除某某数据,那么执行一次和多次结果不变,不必特意关注,因为这条业务就是幂等的。但是如果是让你的银行卡里的钱-100,欸嘿嘿,就不一样了
问题的关键试试重复投递,导致的重复消费。消息系统本身不能保证消息仅被消费一次,因为消费本身可能重复、下游系统启动拉取重复、失败重试带来的重复、补偿逻辑导致的重复都有可能造重复消息,要保证消息仅被消费一次可以利用等幂性来实现。就是从消息的唯一性下手,给他一个标记id即可。
id法
保证生产者等幂性,在生产消息的时候,利用雪花算法给消息生成一个全局 ID,在消息系统中维护消息已 ID 映射关系,如果在映射表中已经存在相同 ID,这丢弃这条消息,虽然消息被投递了两次,但是实际上就保存了一条,避免了消息重复问题。
生产者等幂性跟所选者的消息中间件有关系,因为绝大数情况下消息系统不需要我们自己实现,所以等幂性是不太好控制的,消费者等幂性才是我们开发人员控制的重点方向。
在消费者端可以从通用层和业务层两个方面来做等幂操作,取决于我们的业务要求。
在通用层面中,利用好消息生成是产生的全局唯一ID,消息被处理成功后,把这个全局 ID 存入到数据中,在处理下一条消息之前,先从数据库中查询这个全局 ID 是否存在,如果已经存在,则直接放弃该消息。
关于id的生成方法我们可以自己编写,也可以通过配置spring内置的template的消息转换器,也就是我们在发送消息时使用的convertandsent()方法,其底层的消息转换器我们是可以设置的,默认的消息转换器性能不好,推荐自己设置,它可以自己用过UUID生成id,我们可以在消费者端获取,并验证存储
@Bean
public MessageConverter messageConverter(){
//1,定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2]sonMessageConverter();
//2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
利用这个全局唯一ID就实现了消息等幂性,伪代码如下:
boolean isIDExisted = selectByID(ID); // 判断ID是否存在
if(isIDExisted) {
return; //存在则直接返回
} else {
process(message); //不存在,则处理消息
saveID(ID); //存储ID
}
但是在极端情况下,这种方式还是会出问题,如果消息在处理之后,还没来得及保存到数据库,消费者就宕机重启了,重启之后还会再次获取该消息,执行时查询该消息并未被消费过,还是会执行两次消费。可以引入数据库事务来解决这个问题,但是会降低系统性能。如果对消息重复消费没有特别严格要求的话,直接使用这种没有引入事务的通用方案就好了,毕竟这也是极小概率的事情。
在业务层面上,我们可选择性就变多了,比如乐观锁、悲观锁、内存去重(github.com/RoaringBitm…
我们拿乐观锁来举例,比如我们要给一个用户加积分,因为加积分操作并不需要放在主业务中,所以就可以使用消息系统来异步通知,要使用乐观锁,就需要给积分表添加一个版本号字段。并且在生产消息的时候先查询这个账号的版本号并且连同消息一起发送到消息系统中。
消费者拿到消息和版本号后,在执行更新积分操作的 SQL 时带上版本号,类似于:
update score set score = score + 20, version=version+1 where userId=1 and version=1;
这条消息消费成功后,version 就变成了 2,那么如果有重复的 version=1 的消息再次被消费者拉取到,SQL 语句并不会执行成功,从而保证了消息的幂等性。
要保证消息仅被消费一次,我们需要把重点放在消费者这一段,利用等幂性来保证消息被消费一次。
基于业务
基于业务,例如我要以消息的方式修改业务状态,从未支付修改为已支付,那么我在修改的时候这个状态一定是未支付的,从而验证判断。保证消息幂等性。
3.如何实现顺序消费
什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
1.发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
2.队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。
不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。
3.消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,
虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
方案
a.如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,根据业务id进行hash运算根据规则将消息分发到不同的队列,每个队列对应一个consumer,队列内一定是单线程的。通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
b.方案,一个队列,一个consumer,在consumer内去多线程处理,也是通过hash(业务id)去分发即可。
以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢,就是说集群模式下,服务器的消费者订阅一个队列。
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现
private Queue creatQueue(String name){
// 创建一个 单活模式 队列
HashMap<String, Object> args=new HashMap<>();
args.put("x-single-active-consumer",true);
return new Queue(name,true,false,false,args);
}
4.如何实现延时消息
延迟消息我推荐大家看这个文章,我只能说牛,真T*D的全。
RabbitMQ实现消息的延迟推送或延迟发送_rabbitmq实现延迟消息
补充惰性队列(Lazy Queues)-CSDN博客
再看亿遍,cry
5.消息堆积了怎么解决,消息丢失了怎么办?
没什么好说的,,本质就是生产者速度高于消费速度导致队列满了,要么提高消费速度,要么降低发送速度,要么扩充队列长度。rabbitmq的天生优势在3.12版本后引入了lazy queue. 扩充规模,或者直接丢弃,晚上再补。
消息积压处理办法:临时紧急扩容: 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发 数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀 轮询写入临时建立好的 10 倍数量的 queue。 接着临时征用 10 倍的机器来部署 consumer,每一 批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资 源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的 架构,重新用原先的 consumer 机器来消费消息。 MQ中消息失效:假设你用的是 RabbitMQ, RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有 类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比 如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那 批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回 来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你 只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。 mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满 了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数 据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上 再补数据吧。
【RabbitMQ】RabbitMQ 消息的堆积问题 —— 使用惰性队列解决消息的堆积问题_rabbitmq消息堆积-CSDN博客
at least
版权归原作者 坎坷er 所有, 如有侵权,请联系我们删除。