之前文章:
兔老大的系统设计(一)健康度系统
一、背景
延迟队列的应用场景非常广泛,如客户主动操作:
- 股票定投
- 顾客预约场景
- 会员定时续费/缴费
- CSDN定时发布
或系统内部操作:
- 订单成功后,在30分钟内没有支付,自动取消订单
- 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
- 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
- 淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
二、需求分析
场景多种多样,我们尽量做出一个通用的,功能完备的,能满足大部分场景的系统。
可以以顾客预约场景为例进行设计,假设会量大、量不稳定、存储时间长(比如几个月后执行),这样设计出来的系统就普遍适用。
三、目标明确
3.1功能
延时队列相比于普通队列最大的区别就体现在其延迟的属性上.
普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。
从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。
要实现的功能如下:
1)可以设置定时任务
2)可以修改未到执行时间的任务属性(包括执行时间)
3)可以查询任务情况,人工干预
3.2设计重点
大部分系统的重点一般在:性能、可用性、安全三方面。
3.2.1性能
及时:时间到了立刻执行,不能延迟太久。(如crontab分钟粒度就太粗了)
3.2.2可用
可靠:保证任务不重不漏的执行,不能丢任务、不能重复执行
高可用可扩展:服务尽量不挂、可抗住突发的大量请求
可恢复:系统挂了或者任务失败/丢失等等,可以恢复
3.2.3其它
可撤回/修改:如果定时任务还没到执行时间,可以修改执行时间和其他内容,也可取消。
存时间长:有些场景甚至要保存一年以上,比如用户办理年卡后,要有一些策略诱导消费。
四、一些探索
本章不局限于实现所有的目标,提出一些业内常见的实现方案,供大家增长知识面,和最终方案可以有个对比。
请注意看每种方案下的分析
4.0 数据库
在小型项目中,通过一个线程定时扫数据库,通过执行时间字段来判断是否到时,然后进行操作
优点:简单,支持集群操作
缺点: (1)对服务器内存消耗大
(2)存在延迟,执行时间粒度和mysql本身的速度都会影响
(3)假设你的订单有几千万条,每隔几分钟这样扫描一次,数据库损耗极大
4.1 DelayQueue 延时队列
4.1.1 介绍
JDK 中提供了一组实现延迟队列的 API,位于Java.util.concurrent包下的 DelayQueue。
DelayQueue 是一个 BlockingQueue,本质就是封装了一个 PriorityQueue(优先队列),内部用堆来实现队列元素排序,向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。
4.1.2简单实现
1)实现 Delayed 接口,接口里只有一个 getDelay 方法,用于设置延期时间。
2)Order 类中compareTo()负责对队列中的元素进行排序。
public class Order implements Delayed {
//延迟时间
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
DelayQueue 的 put 方法是线程安全的,内部使用了ReentrantLock进行线程同步。
上边只是简单的实现,实际开发中会有专门的线程负责消息的入队与消费。
4.1.3 分析
分析:事实上,如无必要,我们应该尽可能使用语言自带的库,而非过度设计。从这方面考虑,DelayQueue无疑是一个简单优秀的实现,但是在大型项目中,本地存储的方案确实不太适用。
无论如何,我们仍然可以对它大根堆和线程控制的方法进行学习和借鉴。
4.2 RabbitMQ
RabbitMQ 本身并不直接提供对延迟队列的支持,我们依靠 RabbitMQ 的TTL以及死信队列功能,来实现延迟队列的效果。那就让我们首先来了解一下,RabbitMQ 的死信队列以及 TTL 功能。
4.2.1死信队列
死信队列实际上是一种 RabbitMQ 的消息处理机制,当 RabbmitMQ 在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:
- 消息被拒绝并且不再重新投递
- 消息超时未消费,也就是 TTL 过期了
- 消息队列到达最大长度
消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。
4.2.2消息生存时间 TTL
TTL(Time-To-Live)表示了一条消息的最大生存时间,单位为毫秒。如果一条消息在 TTL 设置的时间内没有被消费,那么它就会变成一条死信,进入我们上面所说的死信队列。
如何设置消息的 TTL 属性
- 一种方式是直接在创建队列的时候设置整个队列的 TTL 过期时间,所有进入队列的消息,都被设置成了统一的过期时间,一旦消息过期,马上就会被丢弃,进入死信队列,在延迟队列的延迟时间为固定值的时候,比较适合使用这种方式:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
- 另一种方式是针对单条消息设置,参考代码如下,该消息被设置了 6 秒的过期时间:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());
4.2.3实现延迟队列
我们利用死信队列的这个属性,把需要延迟的消息,将 TTL 设置为其延迟时间,投递到 RabbitMQ 的普通队列中,一直不去消费它,那么经过 TTL 的时间后,消息就会自动被投递到死信队列,这时候我们使用消费者进程实时地去消费死信队列中的消息,就实现了延迟队列的效果。
从下图可以直观的看出使用 RabbitMQ 实现延迟队列的整体流程:
使用 RabbitMQ 来实现延迟队列,具有很明显的一些优势:比如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者消息丢失。
4.2.4存在的bug
RabbitMQ 在检查消息是否过期时,只会检查第一个消息是否过期,并不会校验后面消息过期的情况比如第一个消息设置了 20s 的 TTL,第二个消息设置了 10s 的 TTL,那么 RabbitMQ 会等到第一个消息过期之后,才会让第二个消息过期。
解决这个问题的方法也很简单,只需要安装 RabbitMQ 的一个插件即可:
Community Plugins — RabbitMQ
安装好这个插件后,所有的消息就都能按照被设置的 TTL 过期了。
4.2.5插件实现原理
上面使用 DLX + TTL 的模式,消息首先会路由到一个正常的队列,根据设置的 TTL 进入死信队列,与之不同的是通过 x-delayed-message 声明的交换机,它的消息在发布之后不会立即进入队列,先将消息保存至 Mnesia。
这个插件将会尝试确认消息是否过期,首先要确保消息的延迟范围是 Delay > 0, Delay =<ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒),如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列,整个消息的投递过程也就完成了。
4.2.6 分析
作为网上流传非常广的一种方案,它似乎真的是一种不错的实现。
尤其是,由于 RabbitMQ 本身的消息可靠发送、消息可靠投递、死信队列等特性,可以保障消息至少被消费一次以及未被正确处理的消息不会被丢弃,让消息的可靠性有了保障。
但是这方案有如下缺点:
1、为了解决一个问题,又引入了队列交换机+mq+私信队列交换机+私信队列+插件,我们并不希望引入如此复杂不可控的架构。
2、配置麻烦,额外增加死信交换机和死信队列等配置,不好维护
3、不可靠,实际测试环境延时插件有时收不到消息,不是很稳定。配置错误、生产者消费者连接的队列错误和其他未知因素都有可能造成延迟失效。
4、真实消费原因不唯一:消息被拒绝、消息过期、消息超长等等原因都会进入死信队列,这种不唯一也是我们无法忍受的。,我们无法知道死信队列中是否都是过期消息。
4.3 kafka-TimeWheel
TimeWheel 时间轮算法,是一种实现延迟队列的巧妙且高效的算法,被应用在 Netty,Zookeeper,Kafka 等各种框架中。
4.3.1时间轮
如上图所示,时间轮是一个存储延迟消息的环形队列,其底层采用数组实现,可以高效循环遍历。这个环形队列中的每个元素对应一个延迟任务列表,这个列表是一个双向环形链表,链表中每一项都代表一个需要执行的延迟任务。时间轮会有表盘指针,表示时间轮当前所指时间,随着时间推移,该指针会不断前进,并处理对应位置上的延迟任务列表。
4.3.2添加延迟任务
由于时间轮的大小固定,并且时间轮中每个元素都是一个双向环形链表,我们可以在
O(1)
的时间复杂度下向时间轮中添加延迟任务。
如下图,例如我们有一个这样的时间轮,在表盘指针指向当前时间为 2 时,我们需要新添加一个延迟 3 秒的任务,我们可以快速计算出延迟任务在时间轮中所对应的位置为 5,并添加到位置 5 上任务列表尾部。
4.3.3多层时间轮
上面的时间轮的大小是固定的,只有 12 秒。如果此时我们有一个需要延迟 200 秒的任务,我们应该怎么处理呢?直接扩充整个时间轮的大小吗?这显然不可取,因为这样做的话我们就需要维护一个非常非常大的时间轮,内存是不可接受的,而且底层数组大了之后寻址效率也会降低,影响性能。
为此,Kafka 引入了多层时间轮的概念。其实多层时间轮的概念和我们的机械表上时针、分针、秒针的概念非常类似,当仅使用秒针无法表示当前时间时,就使用分针结合秒针一起表示。同样的,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中,如下图所示:
第一层时间轮整个时间轮所表示时间范围是 0-12 秒,第二层时间轮每格能表示的时间范围是整个第一层时间轮所表示的范围也就是 12 秒,所以整个第二层时间轮能表示的时间范围即 12*12=144 秒,依次类推第三层时间轮能表示的范围是 1728 秒,第四层为 20736 秒等等。
比如现在我们需要添加一个延时为 200 秒的延迟消息,我们发现其已经超过了第一层时间轮能表示的时间范围,我们就需要继续往上层时间轮看,将其添加在第二层时间轮 200/12 = 17 的位置,然后我们发现 17 也超过了第二次时间轮的表示范围,那么我们就需要继续往上层看,将其添加在第三层时间轮的 17/12 = 2 的位置。
4.3.4具体实现
Kafka 中时间轮算法添加延迟任务以及推动时间轮滚动的核心流程如下,其中 Bucket 即时间轮中的延迟任务队列,并且 Kafka 引入的 DelayQueue 解决了多数 Bucket 为空导致的时间轮滚动效率低下的问题:
使用时间轮实现的延迟队列,能够支持大量任务的高效触发。并且在 Kafka 的时间轮算法的实现方案中,还引入了 DelayQueue,使用 DelayQueue 来推送时间轮滚动,而延迟任务的添加与删除操作都放在时间轮中,这样的设计大幅提升了整个延迟队列的执行效率。
4.3.5 分析
从复杂度上说:相比 DelayQueue ,时间轮在复杂度上有优势。DelayQueue 由于涉及到调整数据的位置,插入和移除复杂度是 O(lgn),而时间轮在插入和移除的复杂度都是 O(1)。
从实际上说:相比其它MQ,kafka在我的认知里是最优秀的,事实上在我的十万级压测中,它是唯一性能达标的MQ(有些MQ已经接近挂了它还很健康)。同时kafka也有一定的持久化方案。
但是这种方案依旧有一些问题:
1、正如我开头提到的,需求很可能是保存一个月甚至更长时间,超过了默认的log.retention.hours(168)的大小。
2、我们希望执行时间视可修改的,但是kafka的消息一旦由生产者发送,则不可变。关于这方面讨论我贴了一个链接,感兴趣的可以看看。stackoverflow问题https://stackoverflow.com/questions/60046428/what-is-kafka-message-tweaking
其实探索到想用众多MQ来实现延迟队列时,我越来越清晰的有一种感觉:非逼着众多MQ(比如kafka)做不擅长的事情本身就有问题,人家的定位就是消息队列,而不是替你保存动辄一个月才执行的消息然后精准执行。
4.4 一些其他方案
这里我不准备继续分析所有方案的优缺点了,因为这是很无聊的(而且影响接下来方案叙述的节奏),如Quartz、ActiveMQ、RocketMQ、nsq、pulsar等等,原因无非是性能不达标、时间粒度不够、存储时间不够等等,在这里放一张MQ的对比图:(如果满足你的要求,当然也可以用)
4.5 Redis ZSet
基于4.3结尾的考虑,首先要有地方做持久化,redis作为nosql的老大,呼之欲出。
4.5.1过期回调
只是提一嘴,这种歪门邪道的实现就不要想了,事实上容易出大问题,有兴趣可以了解。
4.5.2 正解介绍
Redis有一个有序集合的数据结构ZSet,ZSet中每个元素都有一个对应Score,ZSet 中所有元素是按照其 Score 进行排序的。我们利用它Score有序的属性,可以对入队的成员,按过期时间从小到大排列!
那么我们可以通过以下这几个操作使用 Redis 的 ZSet 来实现一个延迟队列:
- 入队操作:我们将需要处理的任务,按其需要延迟处理时间作为 Score 加入到 ZSet 中。Redis 的 ZAdd 的时间复杂度是
O(logN)
,N
是ZSet 中元素个数,因此我们能相对比较高效的进行入队操作。 - 起一个进程定时(比如每隔一秒)查询 ZSet 中 Score 最小的元素,查询结果有两种情况:
- 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;
- 查询出的分数大于当前时间戳,说明 ZSet 中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询;
4.5.3 分析
从上述的讨论中我们可以看到,通过 Redis Zset 实现延迟队列是一种理解起来较为直观,可以快速落地的方案。并且我们可以依赖 Redis 自身的持久化来实现持久化,使用 Redis 集群来支持高并发和高可用,是一种不错的延迟队列的实现方案。
但是redis同样有缺点(但是被解决了):
1、定位问题:和上文提到的众多mq一样,redis的定位并不是延迟队列。
(经验告诉我们,如果硬要用与需求定位不符的东西,就是容易出问题)
但是由于这种方式实在过于简单好用,在业界确实非常太有市场,我记得redis作者本人都曾经谈过这个问题,告诉大家最好不要把redis用作消息队列之类的,只不过貌似没人听。
2、持久化不是百分百可靠:redis持久化两种方式我就不讲了,最高级的持久化配置就是每次操作都记录,但是由于性能问题,基本不可能这样配(事实上大公司有明确规定不能这样配)。
3、 真实案例,如果qps过高,虽然redis扛得住(和kafka一样,真男人啊),但是我们的服务扛不住。
五、方案
5.1 思考
读者紧接着4.5.3的想法不要断,我们想用redis,但是有三个问题,如何解决呢?
问题2:没的说,上mysql万事大吉。
问题3:redis接MQ,完美解决问题,这不就是mq天天吹的其中一大作用吗。
问题1:我们只是用来排个序,消息队列和持久化都不是redis做了,符合定位。
5.2 整体架构
基于这些考虑,最终我们的架构是这样的:
其他服务想使用定时功能,调用写接口,核心线路有两条:
第一条:
1)将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上
2)每个 Redis Key 都对应建立一个处理进程,称为 Event ,轮询,查询是否有待处理的延迟消息
3) Event 进程不负责业务,基本只负责分发消息,具体的业务逻辑通过kafka解耦,消费者实现。
4)我们规定消费者一定要上报执行结果,以便我们决定是否重复请求
第二条:
1)将消息写入DB(或更改)
2)event进程扫过期一段时间的任务(可配置)
3)主动请求消费者执行
5.3 细节补充
1)mysql
表结构
2)redis
如觉得写mysql这条链路也太麻烦,并且没有存储很久的需求,可以用redis自身的持久化功能,同时开启RDB和AOF,AOF设置everysec,即每秒异步刷盘一次。极端情况下,可能会丢失一秒的数据。
高可用使用的是redis的主从复制模式。服务高可用方面,在实现过程中考虑了服务节点的横向扩展,Timer、Cleaner等对同一个redis队列的操作都加了分布式锁。每个服务节点都是无状态的,不需要进行元数据同步等操作,少数服务节点宕机不影响整个服务的可用性。
3)监控
对于消息堆积,以及消息超过重试次数被丢弃等场景,说明消费端服务异常,没有正常消费及ack,需要及时上报并通知给业务方及服务提供方,方便快速发现并排查问题。
4)mq
mq的选择上文有对比图,详细分析以后补充吧。
六、总结和QA
分析架构
可能很多人会有些疑惑:你抛出来看起来这么复杂的图,实现起来是不是很麻烦?它真的很好用吗?
下面首先回忆第三章,看是否实现目标。
从性能出发,整条链路看:
1)redis key可以增加,不用担心量大影响性能
2)event定时任务每秒轮询,基本没延迟
3)event没业务逻辑,校验+转发个消息很快
3)消息队列选用的性能最强的kafka
实际测试也符合要求
从可用出发,整条链路看:
1)将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上,这样做有两大好处:
- 避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为
O(logN)
)。 - 系统具有了更好的横向可扩展性,当数据量激增时,我们可以通过增加Redis Key 的数量来快速的扩展整个系统。
但是会存在一个问题,因为增加key的数量,必然涉及到hash算法范围的调整,那么原先集合中的元素就不能通过新的hash算法路由到,所以需要采用一致性hash算法。
2)所有的 Event 进程只负责分发消息,具体的业务逻辑通过MQ解耦,由消费者异步处理,这么做的好处也是显而易见的:
- Event 进程只负责分发消息,那么其处理消息的速度就会非常快,就不太会出现因为业务逻辑复杂而导致消息堆积的情况。
- 采用一个额外的消息队列后,消息处理的可扩展性也会更好,我们可以通过增加消费者进程数量来扩展整个系统的消息处理能力。
- Event 是多机多进程模型,保证整个系统的高可用性。采用 Zookeeper 选主的方式,保证同一时间只会有一个进程去处理消息,一旦 Zookeeper 的 leader 主机宕机,会自动选择新的 leader 来处理。
从其他功能看:
要求可恢复、可撤回、可修改、保存时间超长。
我们用mysql解决了大部分问题,修改时,记得把redis也改了(这也是比用kafka好的点,可修改)
到底如何设计?
这是本人一点粗浅的理解
回看我们的方案,第一,无论是基于死信队列还是数据先存储(mysql/redis)后投递(kafka),亦或是redis超时时间,本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,这么做一方面降低耦合度,另一方面也是为了降低数据不可控的时间。
这也是我想说的,经验告诉我们,大量数据存在不可操作、不可见的地方是一件很糟糕的事(本文提的很多方案有这个毛病),服务不是写完就完了,还要维护,所以我们尽量不要这么做。
第二,既然选择了数据分离,整条链路的存储组件和队列组件的选择,按需选择,十分重要。
本方案就是mysql/redis+kafka
第三、无论是检查队头消息TTL还是调度存储的数据,本质上都是通过定时任务来完成的,定时任务的触发策略也是决定你方案优劣的决定性因素:你是crontab配置,还是主备选举策略、还是大家一起抢分布式锁,也值得根据具体情况具体分析
还是觉得太复杂,能否简化一点?
可以,我的建议是,如果qps不高的话,去掉kafka会是一个简单方案。
多线程如何处理?
如果你指的是,高并发场景下存在同一条消息被多次消费的情况,你可以使用分布式锁,如zookpeer、redis的红锁、自己做一个等等。
本方案目前不存在这类问题
版权归原作者 兔老大RabbitMQ 所有, 如有侵权,请联系我们删除。