在发送消息时,如果消息发送失败,发送方会对消息进行重发,这就会产生重复的消息。如果我们不对重复消息进行处理,可能会对系统造成一定的影响。
如果消息队列本身能保证消息不会重复,那我们在消费端的实现逻辑就会变得很简单。
一、如何通过消息队列保证消息不重复?
在
MQTT
协议中,对于消息队列给定了三种传递消息的质量标准:
- At most once:至多一次。消息在传递时,最多会被送达一次。也就是说允许丢消息,适合一些对消息可靠性要求不太高的监控场景使用。
- At least once:至少一次。消息在传递时,至少会被送达一次。也就是说允许有重复消息的出现。
- Exactly once:恰好一次。消息在传递时,只会被送达一次。这是最高的等级,消息不允许重复也不允许丢失。
事实上,常用的消息队列如RabbitMQ、RocketMQ、Kafka提供的服务质量都是
At least once
,也就是说很难通过消息队列本身保证消息不重复。
补充:Kafka实际是支持
Exactly once
的,但是不同于上文介绍的
Exactly once
,它主要是配合流计算来进行使用。
因此,我们需要在消费端来解决重复消息的问题。
二、消费端通过幂等性解决重复消费问题
一般解决重复消息的方法就是,让我们消费消息的操作具备幂等性。
- 幂等性:执行任意多次的操作与执行一次的操作造成的影响相同。
一个幂等的方法,使用同样的参数,执行一次和多次多系统的影响是相同的,因此就达到了:
At least once
+幂等方法=
Exactly once
。
实现幂等最好的方式就是从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。举个例子,“将账户A的余额增加100元”,这是一个非幂等的操作,如何将其改为幂等性的?主要有以下三种方式:
1.利用数据库的唯一约束实现幂等
我们可以创建一张转账流水表,其中字段包括:转账单id,账户id和变更金额。然后给转账单id和用户id创建一个唯一约束,这样对于相同的转账单id和账户id,表里最多只能存在一条记录。
基于这个思路,除了关系型数据库,redis的setnx也可以实现幂等消费。
2.为更新的数据设置前置条件
在更新数据前,我们可以设置一个前置条件,只有满足了条件我们才对数据进行更新。例如我们可以把上述操作改为:“如果账户X的余额为500元,则将余额增加100元”。
对于比较复杂的更新操作,我们可以给数据增加一个版本号属性,每次更新数据前比较当前数据的版本号与消息中的版本号是否一致,如果不一致则拒绝更新数据。更新数据后将版本号+1,实现幂等。
3.设置全局唯一id
更通用的一种方法是,记录并检查操作,也称为全局唯一id机制。
在发送消息时,给每条消息指定一个全局唯一的id,消费时先根据id检查这条消息是否被消费过,如果没有消费国再更新数据,然后将消费状态设置为已消费。
但是在分布式系统中,这个方法很难实现。因为不太好同时满足简单、高可用和高性能,且必须保证
检查消费状态、更新数据、设置消费状态
三个操作为原子性,否则就会出现bug。
版权归原作者 lans_g 所有, 如有侵权,请联系我们删除。