一、投递失败的消息怎么处理
首先投递失败存在如下两个情况
- 交换器无法根据自身的类型和路由键匹配到队列
- 交换器将消息路由到队列时,发现队列上并不存在任何消费者
解决方案:
生产者投递消息时指定 mandatory 或者 immediate 参数设为 true ,RabbitMQ 会把无法投递的消息通过 Basic.Return 命令将消息返回给生产者,此时生产者需要调用 channel.addReturnListener 来添加 ReturnListener 监昕器实现监听投递失败的消息
如果设置了上方两个参数就要添加 ReturnListener 逻辑,使生产者的逻辑变得复杂,RabbitMQ 中的备份交换机也可以处理这个问题
通过在声明交换器(调用 channel.exchangeDeclare 方法)的时候添加 alternate-exchange 参数来实现
对于备份交换器,使用时包含几种特殊情况:
- 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失
- 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失
- 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失
如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效
二、如何设置消息的过期时间
- 设置队列属性,队列中所有消息都有相同的过期时间
- 对消息本身进行单独设置,每条消息的 TTL 可以不同
如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准
三、如何实现延时队列
答案:使用死信交换机
将消息投递到一个没有消费者的队列中,并为这个队列指定对应的死信队列,当消息达到设置过期时间还没有被消费时,将会被发布到死信队列中,而消费者订阅死信队列直接消费到了一份延时消息
延时消息插件
使用延时队列需要为 RabbitMQ 添加额外插件
延时插件 rabbitmq_delayed_message_exchange 下载地址
https://www.rabbitmq.com/community-plugins.html
把下载好的插件放到以下目录
/usr/lib/rabbitmq/lib/rabbitmq_server3.6.4/plugins
启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明交换器类型为 x-delayed-message 来标示此交换机为延时交换机
发送消息时在 header 中添加 x-delay 参数来控制消息的延时时间
四、如何指定消息的优先级
设置队列的 max priority 参数,RabbitMQ中消息的优先级默认是0,最大值是10
五、消息的持久化是如何实现的
RabbitMQ的持久化分为:交换器的持久化、队列的持久化和消息的持久化
交换器和队列的持久化都是通过在声明时将 durable 参数置为 true 实现的
消息的持久化是在发送消息指定deliveryMode为2实现的
六、如何保证消息不丢失
生产者开启事务或者发送方确认机制,交换机、队列和消息全部设置持久化,消费者开启消费方确认机制
从图中可以看到一共有以下三种可能出现消息丢失的情况:
1:生产者丢消息
生产者在将数据发送到MQ的时候,可能由于网络等原因造成消息投递失败
2:MQ自身丢消息
未开启RabbitMQ的持久化,数据存储于内存,服务挂掉后队列数据丢失;
开启了RabbitMQ持久化,消息写入后会持久化到磁盘,但是在落盘的时候挂掉了,不过这种概率很小
3:消费者弄丢了消息
消费者刚接收到消息还没处理完成,结果消费者挂掉了。
针对以上三种情况,每种情况都有对应的处理方法:
1》生产者弄丢消息的解决方法
方法一:开启RabbitMQ的事务
rabbitmq提供了与三个事务相关的命令:select开启事务、commit提交事务、rollback回滚事务
采用该种方法由于事务机制,会导致吞吐量下降,太消耗性能。
方法二:开启confirm模式
使用springboot时在application.yml配置文件中做如下配置
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
实现confirm回调接口
@Slf4j
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
//可以进行重发等操作
} else {
log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
}
}
}
生产者发送消息时设置confirm回调
@Slf4j
@Configuration
public class RabbitMqConfig {
@Bean
public ConfirmCallbackService confirmCallbackService() {
return new ConfirmCallbackService();
}
@Bean
public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
/**
* 消费者确认收到消息后,手动ack回执回调处理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService());
return rabbitTemplate;
}
//其他配置代码
......
小结: 事务机制和 confirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm机制是异步的,你发送个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块避免数据丢失,建议使用用 confirm 机制。
2》MQ自身弄丢消息时的解决方法
使用持久化队列,发送消息时做持久化到磁盘处理。
同时设置queue和message持久化以后,RabbitMQ 挂了再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据,保证数据不会丢失。
但是就算开启持久化机制,也有可能出现消息落盘时服务挂掉的情况。这时可以考虑结合生产者的confirm机制来处理,持久化机制开启后消息只有成功落盘时才会通过confirm回调通知生产者,所以可以考虑生产者在生产消息时维护一个正在等待消息发送确认的队列,如果超过一定时间还没从confirm中收到对应消息的反馈,自动进行重发处理。
3》消费者自身弄丢消息时的解决方法
关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费,不过我们只需要保证幂等性就好了,重复消费也不会造成问题。
在springboot中修改application.yml配置文件更改为手动ack模式
spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制
publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true
**消费端手动ack参考代码: **
@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
try {
//业务处理代码
......
//手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理失败,拒绝再次接收...", e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
} else {
log.error("消息即将再次返回队列处理...", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
六、消息重复消费问题
出现消息重复消费的情况:
原因一
1、生产者发送给消息队列以后,消息队列会应达给生产者,但是这个过程中,消息队列出问题了没有收到消息,那么生产者就会重复发生消息,这时就产生了重复消息。
2、生产者发生消息给消息队列,消息队列由于数量太大延迟了,生产者等待响应超时了,这时生产者又会从新发生消息给消息队列。
3、生产者和消息队列因网络问题引起,生产者会发起重试。这样也会产生重复消息。
4、其实主要原因就是,消息成功进入了消息队列,但是由于各种原因消息队列没有给生产者成功的返回值,而生产者又有重试机制这种情况下就会产生重复消息。
原因二
1、消息队列推送给消费者,消费者处理消息这个过程中消费出现了问题,消息队列不知道消费者处理结果,就会在次投递。
2、消费者处理完,网络出现问题,这时没有给中间件消息队列返回结果,消息队列会在次投递消费者。
3、消费者处理超时,超过了消息队列的超时时间,这时消息队列也会再次投递。
4、消费者处理完结果返回给消息中间件,但是消息中间件出现问题,处理结果丢失了,重启后,消息中间件内部检查发现这个消息还没有处理也会在次投递给消费者。
针对该问题,一般是在消费者端做幂等处理。
七、如何保证消息队列消费的幂等性
这一块应该还是要结合业务来选择合适的方法,有以下几个方案:
消费数据为了单纯的写入数据库,可以先根据主键查询数据是否已经存在,如果已经存在了就没必要插入了。或者直接插入也没问题,因为可以利用主键的唯一性来保证数据不会重复插入,重复插入只会报错,但不会出现脏数据。
消费数据只是为了缓存到redis当中,这种情况就是直接往redis中set value了,天然的幂等性。
针对复杂的业务情况,可以在生产消息的时候给每个消息加一个全局唯一ID,消费者消费消息时根据这个ID去redis当中查询之前是否消费过。如果没有消费过,就进行消费并将这个消息的ID写入到redis当中。如果已经消费过了,就无需再次消费了。
八、消息保证顺序消费
消息在投入到queue的时候是有顺序,如果只是单个消费者来处理对应的单个queue,是不会出现消息错乱的问题。但是在消费的时候有可能多个消费者消费同一个queue,由于各个消费者处理消息的时间不同,导致消息未能按照预期的顺序处理。其实根本的问题就是如何保证消息按照预期的顺序处理完成。
出现消费顺序错乱的情况
为了提高处理效率,一个queue存在多个consumer
一个queue只存在一个consumer,但是为了提高处理效率,consumer中使用了多线程进行处理
保证消息顺序性的方法
将原来的一个queue拆分成多个queue,每个queue都有一个自己的consumer。该种方案的核心是生产者在投递消息的时候根据业务数据关键值(例如订单ID哈希值对订单队列数取模)来将需要保证先后顺序的同一类数据(同一个订单的数据) 发送到同一个queue当中。
一个queue就一个consumer,在consumer中维护多个内存队列,根据业务数据关键值(例如订单ID哈希值对内存队列数取模)将消息加入到不同的内存队列中,然后多个真正负责处理消息的线程去各自对应的内存队列当中获取消息进行消费。
RabbitMQ保证消息顺序性总结:
核心思路就是根据业务数据关键值划分成多个消息集合,而且每个消息集合中的消息数据都是有序的,每个消息集合有自己独立的一个consumer。多个消息集合的存在保证了消息消费的效率,每个有序的消息集合对应单个的consumer也保证了消息消费时的有序性。
版权归原作者 程序猿七度 所有, 如有侵权,请联系我们删除。