0


RocketMQ Push消息给消费者 解析——图解、源码级解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南

📆 最近更新:2022年9月9日

🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

概述

RocketMQ中消费者有两种方式获得消息来消费:Push模式和Pull模式

  1. Push模式:服务端将消息推送到客户端
  2. Pull模式:客户端不断请求服务端,或许消息

实际上这两种模式的底层实现都是采用的Consumer主动拉取消息的方式

在Push方式里,把Consumer轮询过程封装起来了,并注册了MessageListener监听器,当拉取到消息后唤醒监听器来消费,所以感觉消息就像是被服务端推过来的一样

在Pull模式中,Consumer拉取消息的过程需要我们自己编写:

  1. 根据Topic拿到所有的MQ
  2. 拉取MQ中的消息
  3. 记录下一次要拉取消息的offset,以便之后在此位置继续获取消息

RocketMQ保证消息实时性的机制?
这里RocketMQ采用了长连接的方式实现,有点像同步IO的概念,如果服务端没有消息时就会阻塞该请求不返回,直到有数据或超时才返回给Consumer。Consumer处理完消息后再向服务端发送新的请求,重复上面的过程…
在这里插入图片描述

Push消息消费流程

整体的代码执行流程如下:
在这里插入图片描述

开启消息消费

回到

DefaultMQPushConsumerImpl#start

方法中的某一段:

// 注册消费者boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(),this);// 注册失败则抛出异常if(!registerOK){this.serviceState =ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());thrownewMQClientException("The consumer group["+this.defaultMQPushConsumer.getConsumerGroup()+"] has been created before, specify another name please."+FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 开启消息消费
 mQClientFactory.start();

可以看出,注册完成MQ消费者之后,就调用

mQClientFactory.start();

来开启消费者消费消息,源码如下:

publicvoidstart()throwsMQClientException{synchronized(this){switch(this.serviceState){case CREATE_JUST:this.serviceState =ServiceState.START_FAILED;// 如果注册中心得url未给出,可以通过Http请求从其他地方获取if(null==this.clientConfig.getNamesrvAddr()){this.mQClientAPIImpl.fetchNameServerAddr();}// 启动响应-响应通道this.mQClientAPIImpl.start();// 启动多个定时任务this.startScheduledTask();// 启动pull取消息服务this.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// 传入false表示不启动push服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK",this.clientId);this.serviceState =ServiceState.RUNNING;break;case START_FAILED:thrownewMQClientException("The Factory object["+this.getClientId()+"] has been created before, and failed.",null);default:break;}}}

通过源码可以看出,开启消息消费之后,会根据服务的状态进行消费消息的各项准备:

  1. 如果服务的状态是CREATE_JUST,先会吧服务状态设置为START_FAILED,如果各项初始化任务都正常执行完毕,则会把服务状态设置为RUNNING,表示正在运行
  2. this.mQClientAPIImpl.start();开启请求-响应,底层基于Netty
  3. this.startScheduledTask();启动多个定时任务,如果此时注册中心地址为null,则会没2min获取一次
  4. this.pullMessageService.start();开始拉取消息
  5. this.rebalanceService.start();消费者开启负载均衡消费消息,每20s一次,根据负载均衡策略选择机器

接收消息

通过上面的分析,

this.pullMessageService.start();

开启了接收消息模式,其中

PullMessageService

类继承了

ServiceThread

类,意味着

PullMessageService

使用多线程消费消息。
在这里插入图片描述

PullMessageService

对象调用

start

方法时,会执行多线程定义的

run

方法来拉取消息:

@Overridepublicvoidrun(){
        log.info(this.getServiceName()+" service started");// 服务没有停止则执行循环while(!this.isStopped()){try{// 使用BlockingQueue阻塞队列,获取队列中的请求并执行PullRequest pullRequest =this.pullRequestQueue.take();// 拉取消息this.pullMessage(pullRequest);}catch(InterruptedException ignored){}catch(Exception e){
                log.error("Pull Message Service Run Method exception", e);}}

        log.info(this.getServiceName()+" service end");}

其中用到了

BlockingQueue

阻塞队列来进行消息的拉取,当提交了消息拉取请求后,如果队列里面为空则立刻执行。

可以发现

DefaultMQPushConsumerImpl

调用的还是拉取消息的方法,拉取消息的源码如下:

publicvoidpullMessage(finalPullRequest pullRequest){/*
        ......
         */// 判断队列是否被lock住,如果被锁住了,则延迟一段时间再拉取// 顺序消费的逻辑if(processQueue.isLocked()){if(!pullRequest.isPreviouslyLocked()){long offset =-1L;try{
                    offset =this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());}catch(Exception e){this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                    log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}boolean brokerBusy = offset < pullRequest.getNextOffset();
                log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);if(brokerBusy){
                    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);}

                pullRequest.setPreviouslyLocked(true);
                pullRequest.setNextOffset(offset);}}else{this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.info("pull message later because not locked in broker, {}", pullRequest);return;}// 获取Topic对应的订阅信息,如果不存在,则延迟拉取消息finalSubscriptionData subscriptionData =this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());if(null== subscriptionData){this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.warn("find the consumer's subscription failed, {}", pullRequest);return;}/*
        ......
         */}

拉取消息的具体处理步骤如下:

  1. 判断ProcessQueue是否被废弃,如果为true则直接返回
  2. 记录最后拉取消息的时间
  3. 判断Consumer是否正在运行,如果返回false则延迟3000ms拉取消息
  4. 判断Consumer是否被锁住,如果返回true则延迟1000ms拉取消息
  5. 判断Consumer持有的消息数量是否超过最大数量1000,如果返回true则说明消费者缓冲区已经满了,延迟50ms拉取消息
  6. 判断消息Offset是否大于2000,如果返回true则延迟50ms拉取消息
  7. 顺序消费消息,使用分布式锁锁定MQ来保证一条一条消费消息。如果MQ不是被第一次锁定,则从上一次消费到的位置开始消费,如果获取锁失败则延迟一段时间再拉取消息
  8. 如果获取到的offset小于nextOffset,说明已经越界,延迟3000ms再进行消费

默认消费方式是从MaxOffset开始往前消费的

  1. 获取Topic对应的订阅消息如果不存在则延迟拉3000ms取消息
  2. 拉取消息使用带有回调的PullCallback,当拉取消息成功时开始消费

具体的消费消息的逻辑是使用向

pullCallback

中传入匿名内部类的方式,一旦拉取到消息之后,就会回调到内部类中的

onSuccess

方法执行具体的逻辑,源码如下所示:

// 消费消息的具体流程PullCallback pullCallback =newPullCallback(){@OverridepublicvoidonSuccess(PullResult pullResult){// 拉取结果不为空if(pullResult !=null){
            pullResult =DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);switch(pullResult.getPullStatus()){case FOUND:// 设置下次拉取消息的offsetlong prevRequestOffset = pullRequest.getNextOffset();
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT =System.currentTimeMillis()- beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset =Long.MAX_VALUE;// 如果没有拉取到消息则立刻再拉取消息一次if(pullResult.getMsgFoundList()==null|| pullResult.getMsgFoundList().isEmpty()){DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}else{
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());// 提交拉取到的消息到processQueue中,返回上一批次的消息是否已经消费完了boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());// 在有序模式下,只有dispatchToConsume为true才提交,并发模式不受影响DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);// 如果处理消息都需要和上次保持一定时间间隔,则稍后再执行if(DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()>0){DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());}else{DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}// 当前offset小于上一次的offset则报错if(pullResult.getNextBeginOffset()< prevRequestOffset
                            || firstMsgOffset < prevRequestOffset){
                        log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                pullResult.getNextBeginOffset(),
                                firstMsgOffset,
                                prevRequestOffset);}break;case NO_NEW_MSG:case NO_MATCHED_MSG:// 没有匹配到消息的情况// 设置下次拉取消息的offset
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());// 持久化消费进度DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);// 提交消费请求DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:// offset非法的情况
                    log.warn("the pull request offset illegal, {} {}",
                            pullRequest.toString(), pullResult.toString());// 设置下次拉取消息的offset
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());// 设置ProcessQueue废弃
                    pullRequest.getProcessQueue().setDropped(true);// 提交延迟处理任务,将ProcessQueue移除DefaultMQPushConsumerImpl.this.executeTaskLater(newRunnable(){@Overridepublicvoidrun(){try{// 更新消费进度,同步消费进度到BrokerDefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                        pullRequest.getNextOffset(),false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());// 将ProcessQueue移除DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                log.warn("fix the pull request offset, {}", pullRequest);}catch(Throwable e){
                                log.error("executeTaskLater Exception", e);}}},10000);break;default:break;}}}@OverridepublicvoidonException(Throwable e){if(!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)){
            log.warn("execute the pull request exception", e);}// 出现异常的话就延迟拉取消息DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}};

根据上面的代码,具体的消费消息的流程:

  1. 使用pullAPIWrapper获取内存中ByteBuffer中的数据,得到拉取消息的结果
  2. 根据拉取消息后结果的状态来判断是否有消息可以被消费 a. FOUND:发现消息 b. NO_NEW_MSG:没有新消息可被拉取 c. NO_MATCHED_MSG:消息不匹配 d. OFFSET_ILLEGAL:offset非法,可能是消息太大

如果是

FOUND

状态,表示可以进行下一步的消费,之后的步骤如下:

  1. 获取消息拉取的offset,设置下一次拉取消息的offset,同时统计消息消费响应时间
  2. 如果没有拉取到消息,马上进行下一次拉取,如果拉取到消息,则把消息提交至ProcessQueue

这里

ProcessQueue

使用了

TreeMap

底层数据结构来提高检索效率,以快速判断消息是否消费完毕


本文转载自: https://blog.csdn.net/HNU_Csee_wjw/article/details/123257874
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。

“RocketMQ Push消息给消费者 解析&mdash;&mdash;图解、源码级解析”的评论:

还没有评论