RabbitMq消息丢失主要是有三种情况:生产者消息未发送到服务端、服务端消息没有做持久化导致丢失、消费端未收到消息。解决方案依次如下:
- 开启事务或使用确认机制。对于一些重要的消息,生产者可以开启事务,确保消息发送成功后再提交事务。或者使用确认机制,等待 MQ 服务器的确认响应,确认消息已成功接收
- 服务端消息持久化以及高可用机制:开启消息的持久化机制,正常消息是存储在内存当中的,开启持久化机制之后,会将消息存储在磁盘当中。还可以开启镜像集群以及自从同步(Raft协议)机制,保证服务宕机之后我的消息不会丢失
- 消费者确认机制:开启消费者确认机制为auto,由spring确认消息处理成功后完成 ack,如果多次没有收到ack就会重复发送,多次没有就会将消息发送到一场交换机
消息重复消费问题,主要就是一个幂等性的一个问题(无论是一个操作执行多少次,产生的结果合执行一次是相同的),解决方案有几个方面:数据库层面、业务层面、分布式系统层面
- 数据库层面:- 唯一索引:对于数据库层面可以使用唯一索引,对于插入操作,可以在数据表中设置唯一索引,确保相同的数据不会被重复插入。- 乐观锁:可以对插入数据添加一个版本号字段,每一次更新数据时,先检查版本号是否与预期一致,如果一致就进行增加版本号,,否则不进行操作
- 业务层面:- 状态控制:对于一些像是点赞的问题,可以通过判断它的状态来进行判断是否执行,就比如我当前点赞那么就将状态设置为1,表示已点赞状态。当再次执行点赞操作时,通过检查当前状态,如果已经是 1,则不进行重复的点赞操作,直接返回已点赞的结果或者忽略此次请求,从而保证了无论点赞操作被执行多少次,最终的效果都是一致的- 去重表:创建一个去重表,用于记录已经处理过的操作。在执行关键操作之前,先检查去重表中是否存在相应的记录。如果存在,则说明该操作已经执行过,直接返回结果;如果不存在,则执行操作并在去重表中插入一条记录。
- 分布式系统层面:- 分布式锁:在分布式的环境当中使用分布式锁来保证同一时间只有一个节点能够执行特定操作,就可以避免重复执行- 全局唯一ID:为每个操作生成一个全局唯一的 ID,在执行操作时将这个 ID 作为参数传递给服务。服务在处理操作时,先检查是否已经处理过具有相同 ID 的操作。如果已经处理过,则直接返回结果;如果没有处理过,则执行操作并记录这个 ID
消费顺序性问题,主要有以下几个层面来解决:生成者层面,消费队列层面、消费者层面
- 生产者层面:单线程发送,确保同一个业务流程的消息由同一个生产者线程发送,这样可以保证消息到达队列的顺序性
- 消费的队列层面:分区或者队列划分,对于支持分区或多个队列的消息队列系统,可以将需要保证顺序的消息发送到同一个分区或队列中
- 消费者层面:- 单线程消费,消费者使用给单线程来消费特定的分区合队列中的消息,可以确保消息按照消息队列顺序进行消费- 业务层面判断:消费者可以在本地存储已处理消息的状态,例如记录已处理的消息的序号或唯一标识。在消费下一个消息之前,检查该消息的序号是否符合预期顺序,如果不符合则等待或进行相应的处理- 重试机制:当消费消息出现错误需要重试时,要确保重试的消息不会打乱顺序。可以将重试的消息放入一个单独的队列或延迟处理,等前面的消息都处理完成后再进行重试
版权归原作者 lose_rose777 所有, 如有侵权,请联系我们删除。