0


RocketMQ源码解析-Broker部分之Broker启动过程

目录

broker启动流程

借用一下【秃头爱健身】博主的的图,我觉得画的很好。
在这里插入图片描述

broker启动可配置参数

-n : 指定broker 的 namesrvAddr 地址;
 -h :打印命令;
 -c :指定配置文件的路径;
 -p :启动时候日志打印配置信息;
 -m :启动时候日志打印导入的配置信息。

启动入口

BrokerStartup

broker启动的入口是在

brokerStartup

,方法是

main
publicstaticvoidmain(String[] args){//创建BrokerControllerstart(createBrokerController(args));}

1.创建brokerController

publicstaticBrokerControllercreateBrokerController(String[] args){//设置RocketMq的版本号System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,Integer.toString(MQVersion.CURRENT_VERSION));//设置broker的netty客户端的发送缓冲大小,默认是128 kbif(null==System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)){NettySystemConfig.socketSndbufSize =131072;}//设置broker的netty客户端的接受缓冲大小,默认是128 kbif(null==System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)){NettySystemConfig.socketRcvbufSize =131072;}try{//PackageConflictDetect.detectFastjson();//命令行选项解析Options options =ServerUtil.buildCommandlineOptions(newOptions());//解析命令行为 ‘mqbroker’的参数
            commandLine =ServerUtil.parseCmdLine("mqbroker", args,buildCommandlineOptions(options),newPosixParser());//如果为空,直接退出if(null== commandLine){System.exit(-1);}//创建broker,netty的相关配置对象finalBrokerConfig brokerConfig =newBrokerConfig();finalNettyServerConfig nettyServerConfig =newNettyServerConfig();finalNettyClientConfig nettyClientConfig =newNettyClientConfig();//是否使用TLS (TLS是SSL的升级版本,TLS是SSL的标准化后的产物,有1.0 1.1 1.2三个版本)
            nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode ==TlsMode.ENFORCING))));//设置netty的服务端监听的端口 10911,对外提供消息读写服务的端口
            nettyServerConfig.setListenPort(10911);//创建消息存储配置finalMessageStoreConfig messageStoreConfig =newMessageStoreConfig();//如果broker是slave节点if(BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()){//比默认的40% 还要小 10int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio()-10;//设置消息存储配置所能使用的最大内存比例,超过该内存,消息将被置换出内存,
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}//解析命令行参数'-c':指定broker的配置文件路径if(commandLine.hasOption('c')){String file = commandLine.getOptionValue('c');if(file !=null){
                    configFile = file;InputStream in =newBufferedInputStream(newFileInputStream(file));
                    properties =newProperties();
                    properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);
                    in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if(null== brokerConfig.getRocketmqHome()){System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation",MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//检查broker配置中的nameServer地址String namesrvAddr = brokerConfig.getNamesrvAddr();if(null!= namesrvAddr){try{String[] addrArray = namesrvAddr.split(";");for(String addr : addrArray){RemotingUtil.string2SocketAddress(addr);}}catch(Exception e){System.out.printf("The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                        namesrvAddr);System.exit(-3);}}//检查broker的角色switch(messageStoreConfig.getBrokerRole()){case ASYNC_MASTER:case SYNC_MASTER://如果是master节点,则设置该节点brokerId=0
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if(brokerConfig.getBrokerId()<=0){System.out.printf("Slave's brokerId must be > 0");System.exit(-3);}break;default:break;}//是否启用 DLedger,即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 trueif(messageStoreConfig.isEnableDLegerCommitLog()){
                brokerConfig.setBrokerId(-1);}//设置消息存储配置的高可用端口,10912
            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort()+1);LoggerContext lc =(LoggerContext)LoggerFactory.getILoggerFactory();JoranConfigurator configurator =newJoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(brokerConfig.getRocketmqHome()+"/conf/logback_broker.xml");//解析命令行参数'-p':启动时候日志打印配置信息if(commandLine.hasOption('p')){InternalLogger console =InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);}//解析命令行参数'-m':启动时候日志打印导入的配置信息elseif(commandLine.hasOption('m')){InternalLogger console =InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig,true);MixAll.printObjectProperties(console, nettyServerConfig,true);MixAll.printObjectProperties(console, nettyClientConfig,true);MixAll.printObjectProperties(console, messageStoreConfig,true);System.exit(0);}

            log =InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);//创建BrokerControllerfinalBrokerController controller =newBrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);// 记住所有的配置以防止丢弃
            controller.getConfiguration().registerConfig(properties);//初始化BrokerControllerboolean initResult = controller.initialize();if(!initResult){
                controller.shutdown();System.exit(-3);}//注册关闭的钩子方法Runtime.getRuntime().addShutdownHook(newThread(newRunnable(){privatevolatileboolean hasShutdown =false;privateAtomicInteger shutdownTimes =newAtomicInteger(0);@Overridepublicvoidrun(){synchronized(this){
                        log.info("Shutdown hook was invoked, {}",this.shutdownTimes.incrementAndGet());if(!this.hasShutdown){this.hasShutdown =true;long beginTime =System.currentTimeMillis();//BrokerController的销毁方法
                            controller.shutdown();long consumingTimeTotal =System.currentTimeMillis()- beginTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);}}}},"ShutdownHook"));return controller;}catch(Throwable e){
            e.printStackTrace();System.exit(-1);}returnnull;}

broker启动会默认启动3个端口
端口说明10911接收消息推送的端口10912消息存储配置的高可用端口10909推送消息的VIP端口

2.

BrokerController

构造函数

构造入参说明
参数类型说明brokerConfigBrokerConfig封装Broker的基本配置信息nettyServerConfigNettyServerConfig封装了broker作为对外提供消息读写操作的MQ服务器信息nettyClientConfigNettyClientConfig封装了broker作为NameServer的客户端的信息messageStoreConfigMessageStoreConfig封装消息存储Store的配置信息

publicBrokerController(finalBrokerConfig brokerConfig,finalNettyServerConfig nettyServerConfig,finalNettyClientConfig nettyClientConfig,finalMessageStoreConfig messageStoreConfig
    ){// BrokerStartup中准备的配置信息this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;// Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件this.consumerOffsetManager =newConsumerOffsetManager(this);//消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系,  会读取store/config/topics.jsonthis.topicConfigManager =newTopicConfigManager(this);//Consumer端使用pull的方式向Broker拉取消息请求的处理类this.pullMessageProcessor =newPullMessageProcessor(this);//Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类this.pullRequestHoldService =newPullRequestHoldService(this);//有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法this.messageArrivingListener =newNotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器this.consumerIdsChangeListener =newDefaultConsumerIdsChangeListener(this);//消费者管理类,并对消费者id变化进行监听this.consumerManager =newConsumerManager(this.consumerIdsChangeListener);//消费者过滤类,按照Topic进行分类,  会读取store/config/consumerFilter.jsonthis.consumerFilterManager =newConsumerFilterManager(this);//生产者管理 按照group进行分类this.producerManager =newProducerManager();//客户端心跳连接处理类this.clientHousekeepingService =newClientHousekeepingService(this);//Console控制台获取Broker信息使用this.broker2Client =newBroker2Client(this);//订阅关系管理类this.subscriptionGroupManager =newSubscriptionGroupManager(this);//Broker对外访问的APIthis.brokerOuterAPI =newBrokerOuterAPI(nettyClientConfig);//FilterServer管理类this.filterServerManager =newFilterServerManager(this);//Broker主从同步进度管理类this.slaveSynchronize =newSlaveSynchronize(this);// 各种线程池的阻塞队列// 发送消息线程池队列this.sendThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.replyThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());this.queryThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());this.heartbeatThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue =newLinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager =newBrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(newInetSocketAddress(this.getBrokerConfig().getBrokerIP1(),this.getNettyServerConfig().getListenPort()));this.brokerFastFailure =newBrokerFastFailure(this);this.configuration =newConfiguration(
            log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig,this.nettyServerConfig,this.nettyClientConfig,this.messageStoreConfig
        );}

重点对一些核心类进行说明
参数说明

ConsumerOffsetManager

Consumer消费进度记录管理类,会读取store/config/consumerOffset.json配置文件

topicConfigManager

消息Topic维度的管理查询类, 管理Topic和Topic相关的配置关系, 会读取store/config/topics.json

pullMessageProcessor

Consumer端使用pull的方式向Broker拉取消息请求的处理类

pullRequestHoldService

Consumer使用Push方式的长轮询机制拉取请求时,保存使用,当有消息到达时进行推送处理的类

messageArrivingListener

有消息到达Broker时的监听器,回调pullRequestHoldService中的notifyMessageArriving()方法

consumerIdsChangeListener

消费者id变化监听器

consumerManager

消费者管理类,并对消费者id变化进行监听

consumerFilterManager

消费者过滤类,按照Topic进行分类, 会读取store/config/consumerFilter.json

producerManager

生产者管理 按照group进行分类

clientHousekeepingService

客户端心跳连接处理类

broker2Client

Console控制台获取Broker信息使用

subscriptionGroupManager

订阅关系管理类

brokerOuterAPI

Broker对外访问的API

filterServerManager

FilterServer管理类

slaveSynchronize

Broker主从同步进度管理类

3.BrokerController初始化

initialize()
publicbooleaninitialize()throwsCloneNotSupportedException{//加载 topic 相关配置,文件地址为 {user.home}/store/config/topics.jsonboolean result =this.topicConfigManager.load();//加载 不同的Consumer消费的进度情况  文件地址为 {user.home}/store/config/consumerOffset.json
        result = result &&this.consumerOffsetManager.load();//加载 订阅关系  文件地址  {user.home}/store/config/subscriptionGroup.json
        result = result &&this.subscriptionGroupManager.load();//加载 Consumer的过滤信息配置  文件地址  {user.home}/store/config/consumerFilter.json
        result = result &&this.consumerFilterManager.load();//如果加载成功if(result){try{//创建消息存储类messageStorethis.messageStore =newDefaultMessageStore(this.messageStoreConfig,this.brokerStatsManager,this.messageArrivingListener,this.brokerConfig);//使用的是DLegerCommitLog,则创建DLedgerRoleChangeHandlerif(messageStoreConfig.isEnableDLegerCommitLog()){DLedgerRoleChangeHandler roleChangeHandler =newDLedgerRoleChangeHandler(this,(DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}//broker消息统计类this.brokerStats =newBrokerStats((DefaultMessageStore)this.messageStore);//load pluginMessageStorePluginContext context =newMessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore =MessageStoreFactory.build(context,this.messageStore);this.messageStore.getDispatcherList().addFirst(newCommitLogDispatcherCalcBitMap(this.brokerConfig,this.consumerFilterManager));}catch(IOException e){
                result =false;
                log.error("Failed to initialize", e);}}//加载消息的日志文件,包含CommitLog,ConsumeQueue等
        result = result &&this.messageStore.load();//如果加载成功if(result){//开启服务端this.remotingServer =newNettyRemotingServer(this.nettyServerConfig,this.clientHousekeepingService);NettyServerConfig fastConfig =(NettyServerConfig)this.nettyServerConfig.clone();//设置10909的服务端口
            fastConfig.setListenPort(nettyServerConfig.getListenPort()-2);//开启10909的服务端口,这个端口只给生产者使用this.fastRemotingServer =newNettyRemotingServer(fastConfig,this.clientHousekeepingService);//处理消息生产者发送的生成消息请求相关的线程池this.sendMessageExecutor =newBrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000*60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,newThreadFactoryImpl("SendMessageThread_"));//处理消费者发出的消费消息请求相关的线程池this.pullMessageExecutor =newBrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000*60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,newThreadFactoryImpl("PullMessageThread_"));//处理回复消息api的线程池this.replyMessageExecutor =newBrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000*60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,newThreadFactoryImpl("ProcessReplyMessageThread_"));//查询线程this.queryMessageExecutor =newBrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000*60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,newThreadFactoryImpl("QueryMessageThread_"));//省略一些线程池//为客户端注册需要处理API指令事件,以及消息发送和消费的回调方法this.registerProcessor();finallong initialDelay =UtilAll.computeNextMorningTimeMillis()-System.currentTimeMillis();finallong period =1000*60*60*24;//每天执行一次,统计昨天put的message和get的messagethis.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{BrokerController.this.getBrokerStats().record();}catch(Throwable e){
                        log.error("schedule record error.", e);}}}, initialDelay, period,TimeUnit.MILLISECONDS);// 默认5s执行一次,会把消费这的偏移量存到文件中  ${user.home}/store/config/consumerOffset.json.jsonthis.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{BrokerController.this.consumerOffsetManager.persist();}catch(Throwable e){
                        log.error("schedule persist consumerOffset error.", e);}}},1000*10,this.brokerConfig.getFlushConsumerOffsetInterval(),TimeUnit.MILLISECONDS);// 默认10s执行一次,会把消费者的消息过滤的信息持久化到文件 ${user.home}/store/config/consumerFilter.jsonthis.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{BrokerController.this.consumerFilterManager.persist();}catch(Throwable e){
                        log.error("schedule persist consumer filter error.", e);}}},1000*10,1000*10,TimeUnit.MILLISECONDS);// 每3分钟,当消费者消费太慢,会禁用到消费者组this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{BrokerController.this.protectBroker();}catch(Throwable e){
                        log.error("protectBroker error.", e);}}},3,3,TimeUnit.MINUTES);//打印当前的Send Queue Size,Pull Queue Size,Query Queue Size,Transaction Queue Sizethis.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{BrokerController.this.printWaterMark();}catch(Throwable e){
                        log.error("printWaterMark error.", e);}}},10,1,TimeUnit.SECONDS);//每隔一分钟打印一次,dispath的消息偏移量和总的消息偏移量的差值this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{
                        log.info("dispatch behind commit log {} bytes",BrokerController.this.getMessageStore().dispatchBehindBytes());}catch(Throwable e){
                        log.error("schedule dispatchBehindBytes error.", e);}}},1000*10,1000*60,TimeUnit.MILLISECONDS);if(this.brokerConfig.getNamesrvAddr()!=null){//更新nameServer地址this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}",this.brokerConfig.getNamesrvAddr());}elseif(this.brokerConfig.isFetchNamesrvAddrByAddressServer()){//没有明确指定name-server的地址,且配置了允许从地址服务器获取name-server地址this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{// 每隔2分钟从name-server地址服务器拉取最新的配置// 这个是实现name-server动态增减的唯一方法   BrokerController.this.brokerOuterAPI.fetchNameServerAddr();}catch(Throwable e){
                            log.error("ScheduledTask fetchNameServerAddr exception", e);}}},1000*10,1000*60*2,TimeUnit.MILLISECONDS);}if(!messageStoreConfig.isEnableDLegerCommitLog()){if(BrokerRole.SLAVE ==this.messageStoreConfig.getBrokerRole()){if(this.messageStoreConfig.getHaMasterAddress()!=null&&this.messageStoreConfig.getHaMasterAddress().length()>=6){this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically =false;}else{this.updateMasterHAServerAddrPeriodically =true;}}else{this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{//定时打印master与slave的差距BrokerController.this.printMasterAndSlaveDiff();}catch(Throwable e){
                                log.error("schedule printMasterAndSlaveDiff error.", e);}}},1000*10,1000*60,TimeUnit.MILLISECONDS);}}//初始化事务消息相关的服务initialTransaction();//初始化权限管理器initialAcl();//初始化RPC调用的钩子initialRpcHooks();}return result;}

3.1注册消息处理器

registerProcessor

rocketmq中有许多线程执行器,包括sendMessageExecutor(发送消息),pullMessageExecutor(拉取消息),queryMessageExecutor(查询消息),adminBrokerExecutor(默认处理)。这些线程执行器会通过registerProcessor注册到NettyRemotingServer ,每一个RequestCode会有一个对应的执行器,最终会以RequestCode为键放到一个HashMap中,当请求到达nettyServer时会根据RequestCode把请求分发到不同的执行器去处理请求

publicvoidregisterProcessor(){/**
         * SendMessageProcessor
         */SendMessageProcessor sendProcessor =newSendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor,this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,this.sendMessageExecutor);//省略其他的处理器      

最终放到processorTable的map中

@OverridepublicvoidregisterProcessor(int requestCode,NettyRequestProcessor processor,ExecutorService executor){ExecutorService executorThis = executor;if(null== executor){
            executorThis =this.publicExecutor;}Pair<NettyRequestProcessor,ExecutorService> pair =newPair<NettyRequestProcessor,ExecutorService>(processor, executorThis);this.processorTable.put(requestCode, pair);}

相关的RequestCode说明一下:
事件名code说明SEND_MESSAGE10生产者发送信息SEND_MESSAGE_V2310生产者发送信息SEND_BATCH_MESSAGE320批量发送消息CONSUMER_SEND_MSG_BACK36消费端消费失败的时候返回的消息PULL_MESSAGE11消费者拉取消息SEND_REPLY_MESSAGE324消费者回包消息,可以用类似RPC调用

3.2初始化事务消息相关的服务

initialTransaction()

服务加载方式是Java的SPI方式。

privatevoidinitialTransaction(){//加载TransactionalMessageService服务,实现类为TransactionalMessageServiceImplthis.transactionalMessageService =ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID,TransactionalMessageService.class);if(null==this.transactionalMessageService){this.transactionalMessageService =newTransactionalMessageServiceImpl(newTransactionalMessageBridge(this,this.getMessageStore()));
            log.warn("Load default transaction message hook service: {}",TransactionalMessageServiceImpl.class.getSimpleName());}//AbstractTransactionalMessageCheckListener对应的服务类为LogTransactionalMessageCheckListener ,其中实现为空实现this.transactionalMessageCheckListener =ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID,AbstractTransactionalMessageCheckListener.class);if(null==this.transactionalMessageCheckListener){this.transactionalMessageCheckListener =newDefaultTransactionalMessageCheckListener();
            log.warn("Load default discard message hook service: {}",DefaultTransactionalMessageCheckListener.class.getSimpleName());}//设置对应的brokerController到AbstractTransactionalMessageCheckListener中this.transactionalMessageCheckListener.setBrokerController(this);//创建TransactionalMessageCheckService,服务是周期检查事务的服务,this.transactionalMessageCheckService =newTransactionalMessageCheckService(this);}}

3.3

initialize

总结

initialize

方法相应的逻辑相对来说比较多,稍微总结为已下几步:

1.服务器内的相关日志文件的加载,{user.home}/store/config/ 文件目录下的json配置文件(包含topics,consumerOffset,subscriptionGroup,consumerFilter)。
在这里插入图片描述

2.如果上述文件加载成功,会启动对应的Broker客户端,然后创建一些线程池,在后面注册 API 指令事件后会监听到API的时候会进行处理
3.注册事件到对应的Broker客户端上,然后会记录对应的API事件和对应线程池封装到一个对象中
4.启动一些 定时任务,这些任务比如记录Broker状态,消费进度持久化等任务
5.初始化一些服务,比如事务相关(周期检查事务),消息权限校验初始化和Rpc调用钩子相关服务。对应的服务加载方式是Java的SPI方式。

4.BrokerControler的

start
 controller.start();publicvoidstart()throwsException{if(this.messageStore !=null){// 启动消息存储服务DefaultMessageStore,其会对/store/lock文件加锁,// 以确保在broker运行期间只有一个broker实例操作/store目录this.messageStore.start();}if(this.remotingServer !=null){// 启动Netty服务监听10911端口,对外提供服务(消息生产、消费)this.remotingServer.start();}if(this.fastRemotingServer !=null){// 监听10909端口this.fastRemotingServer.start();}if(this.fileWatchService !=null){// fileWatchService与TLS有关,todo tls解析this.fileWatchService.start();}if(this.brokerOuterAPI !=null){// 启动Netty客户端netty,broker使用其向外发送数据,比如:向NameServer上报心跳、topic信息。this.brokerOuterAPI.start();}if(this.pullRequestHoldService !=null){// 长轮询机制hold住拉取消息请求的服务this.pullRequestHoldService.start();}if(this.clientHousekeepingService !=null){// 每10s检查一遍非活动的连接服务this.clientHousekeepingService.start();}if(this.filterServerManager !=null){this.filterServerManager.start();}if(!messageStoreConfig.isEnableDLegerCommitLog()){// 处理HAstartProcessorByHa(messageStoreConfig.getBrokerRole());// 启动定时任务,定时与slave机器同步数据,同步的内容包括配置,消费位移等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());// 向所有的nameserver发送本机所有的主题数据;// 包括主题名、读队列个数、写队列个数、队列权限、是否有序等this.registerBrokerAll(true,false,true);}this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){try{// 定时向NameServer注册Broker,最小每10s。BrokerController.this.registerBrokerAll(true,false, brokerConfig.isForceRegister());}catch(Throwable e){
                log.error("registerBrokerAll Exception", e);}}},1000*10,Math.max(10000,Math.min(brokerConfig.getRegisterNameServerPeriod(),60000)),TimeUnit.MILLISECONDS);if(this.brokerStatsManager !=null){// Broker信息统计,这个没有具体的实现;所以暂时不用管this.brokerStatsManager.start();}if(this.brokerFastFailure !=null){// Broker对请求队列中的请求进行快速失败,返回`Broker繁忙、请稍后重试`信息this.brokerFastFailure.start();}}

对这几个服务进行说明一下:
服务名类型说明messageStoreDefaultMessageStore处理消息的存储相关的日志,比如CommitLog,ConsumeQueue等remotingServerRemotingServerBroker的服务端,处理消费者和生产者的请求fastRemotingServerRemotingServer只给消息生产者的服务端fileWatchServiceFileWatchService启动监控服务连接时用到的SSL连接文件的服务brokerOuterAPIBrokerOuterAPIRocketMQ控制台跟Broker交互时候的客户端pullRequestHoldServicePullRequestHoldService处理push模式消费,或者延迟消费的服务clientHousekeepingServiceClientHousekeepingService心跳连接用的服务filterServerManagerFilterServerManager消息过滤的服务transactionalMessageCheckServiceTransactionalMessageCheckService定期检查和处理事务消息的服务slaveSynchronizeSlaveSynchronize主从之间topic,消费偏移等信息同步用的


本文转载自: https://blog.csdn.net/u014161463/article/details/122775468
版权归原作者 Swagger尼克杨 所有, 如有侵权,请联系我们删除。

“RocketMQ源码解析-Broker部分之Broker启动过程”的评论:

还没有评论