引言
RocketMq 作为一个成熟的消息中间件,自身保证了高可用性。学习RocketMq中的高可用性可以帮助我们自己在平时编写代码时能够编写处高可用的代码。
对于RocketMq,高可用主要在这四块进行保证
- 消息发送的高可用:在消息发送时可能会遇到网络问题、Broker 宕机等情况导致消息没有发送成功
- 消息存储的高可用:在 RocketMQ 中消息存储时Broker发生故障导致消息没有保存下来
- 消息消费的高可用:可能由于网络原因导致,也可能由于业务逻辑错误导致消息一直消费失败
- 集群管理的高可用:整一个生产者,broker,消费者的整体能够高可用,主要NameServer的高可用性如何保证
对于这四点,我们依次来学习一下RocketMq中是如何保证的。这一篇主要将 RocketMq 是如何做到存储高可用性的。
1、消息存储的高可用
在机器上的内存中,数据是不安全的,因为机器在断电后这些在内存中的数据就会全部丢失,那么我们保证消息能够不丢失,就需要对这些数据进行持久化到硬盘。
但是在恶劣环境下,可能机器都损坏了,例如火灾了,导致这些机器上的硬盘都损坏了。这样该如何处理呢?处理方式就是主从同步,可以这样说,实现了这两种方式,除非发生第三次世界大战,一般是不会造成数据丢失了。
1.1、RocketMq 数据持久化
1.1.1、数据是何时进行持久化的
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
上面的方法在 RocketMq 启动后就开启了,用来接收客户端发来的请求,包括生产者"发送消息"的请求。
在找到方法后就会由对应的方法来处理,其中
而这个
pair.getObject1
获取得到的处理方式是否是
AsyncNettyRequestProcessor
实现,这里接收生产者 “消息发送” 的请求时异步的,所以会调用
asyncProcessRequest
方法。再处理后进入到
SendMessageProcessor
流程中
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncSendMessage
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
org.apache.rocketmq.store.CommitLog#asyncPutMessage
将消息同步到消息内存中
启动刷盘操作和同步操作
我们可以总结一下:消息的持久化是在接收到生产者发来的消息后就会触发的。在数据同步到
mappedFile
中后,系统就准备好了和
commitLog
一一对应的数据体,接下来就是进行数据刷盘和数据主从同步这两项操作,也正式这两项操作,保证了消息的高可用性。
1.1.2、数据是如何进行持久化的
在上面最后通过
submitFlushRequest
方法来异步唤醒刷盘操作,那么我们来一起看下这个函数
org.apache.rocketmq.store.CommitLog#submitFlushRequest
可以看到刷盘主要有三种
那具体的刷盘方式这里就不在具体展开了,主要聚集在两个类中
mappedFileQueue
,
mappedFile
其中
mappedFileQueue
对外统一处理类,其在
flush
时首先会选出对应的
MappedFile
,上面说过这个对象中就会存储具体的消息数据,然后MappedFile就会将对应的数据持久化到对应的commitLog文件上。
对于
MappedFileQueue
而言,其记录了两个属性:
flushedWhere
和
committedWhere
。前者代表着目前已经落盘的偏移量。后者代表已经提交到文件的偏移量。显然,后者这个属性只有在异步加速模式下才会使用到(也就是开启了
transientStorePoolEnable
)。
对于
MappedFile
而言,有三个属性:
wrotePosition
、
committedPosition
、
flushedPosition
。代表含义如下
- wrotePosition:已经写入的内容的偏移量。这个偏移量可能是写入到文件也可能是写入到内存。
- committedPosition:内存区域提交到文件的偏移量。该属性只有在异步加速模式下才会有用。
- flushedPosition:已经刷入磁盘的偏移量。
1.2、RocketMq 数据主从同步
在
broker
启动时就会开启接收从服务连接的请求,发生的地点为HAService类上
org.apache.rocketmq.store.ha.HAService#start
这里简单介绍一下这几个类的作用
- AcceptSocketService:职责初始化TCP通道,监听新的连接并创建HAConnection。
- GroupTransferService:职责判断主从同步是否完成,完成后唤醒消息发送线程。
- HAClient:是Slave封装实现类,负责与Master建立连接通道,并从通道中获取数据存储;并向Master上报Slave存储的最大物理偏移量。
所以在
AcceptSocketService
启动后就开启了监听,等待从服务来进行连接,RocketMq中的试下如下
org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run
上面的
HAConnection#start()
便是唤醒了
WriteSocketService
和
ReadSocketService
这两个服务来控制服务的socket读写。
这里先来看下
WriteSocketService
,这个函数有点长,我们分段来分析
org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run
这里计算下一次同步数据的偏移量的方式是获取最新的MappedFile的初始偏移量,所以我们可以知道当如果有新的从服务器和主服务器建立连接后,数据同步时从最新的MappedFile文件开始同步的,并不是全部的数据都进行同步。接下来的程序是
最后就是数据的传输了
下面便是write事件的流程
从broker服务器的读事件同理,在等待主服务器写的信息到来后,就将其首先写入自己的内存中,然后根据配置来同步或者异步刷盘。
不知道大家还记得上面在生产者发送一条消息时,最终触发了两个方法
其中
submitFlushRequest
方法是用来唤醒刷盘操作的,那么这个
submitReplicaaRequest
方法看命名就知道是用来触发主从同步的,但是我们通过上面的分析,可以知道,是主broker服务开启监听,等待从broker服务器发来需要同步数据的请求,然后主broker根据从broker发过来的数据同步起点(如果第一次同步,则不需要,这时候主broker服务是同步的最新的CommitLog文件),将最新的数据同步给从broker。好像不需要这里进行触发呀?带着这个问题我们来一起看下这个方法中具体做了什么。
org.apache.rocketmq.store.CommitLog#submitReplicaRequest
我们可以知道,这个函数主要是作用于消息保证高安全性的情况。在这种情况下,在返回给生产者消息已经发送成功前,必须要保证消息已经持久化,并且在有主从时还需要保证消息已经主从同步了。这个
wakeupAll
能用来立即唤醒等待的线程,不然线程可能正在处于wait(100)的场景下,用了这个能够加快同步速度。
那么还有最后一个问题,就是从broker服务器是如何发起数据同不请求的呢?
org.apache.rocketmq.broker.BrokerController#handleSlaveSynchronize
答案就在这个函数上,在broker启动时就会判断当前的broker是否是slave,如果是的话,就会开启一个定时任务,间隔是10s,会不间断的向主broker服务器请求同步数据。
版权归原作者 曳凡 所有, 如有侵权,请联系我们删除。