0


RocketMQ 消费者拉取消息(Pull) 解析——图解、源码级解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南

📆 最近更新:2022年9月30日

🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

消费者拉取消息(Pull)示例

消费者使用Pull方式拉取消息的流程和Push消息的流程基本类似,包括创建消费者对象、设置组名、启动消费者消费。代码如下:

packagecom.wjw;importorg.apache.rocketmq.client.consumer.DefaultMQPullConsumer;importorg.apache.rocketmq.client.consumer.PullResult;importorg.apache.rocketmq.common.message.MessageQueue;importjava.util.HashMap;importjava.util.Map;importjava.util.Set;publicclassPullConsumer{// 存储队列offsetprivatestaticfinalMap<MessageQueue,Long> OFFSET_TABLE =newHashMap<>();publicstaticvoidmain(String[] args)throwsException{DefaultMQPullConsumer consumer =newDefaultMQPullConsumer("group A");// 启动消费者
        consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Target Topic");for(MessageQueue mq : mqs){System.out.println("Consume message from "+ mq);// 拉取消息PullResult pullResult = consumer.pullBlockIfNotFound(mq,null, OFFSET_TABLE.get(mq),32);System.out.println("pullResult : "+ pullResult);// 设置该MQ的offset
            OFFSET_TABLE.put(mq, pullResult.getNextBeginOffset());}
        consumer.shutdown();}}

将上面的流程概括一下:

  1. 创建Pull模式的消费者对象
  2. 启动消费者消费
  3. 调用fetchSubscribeMessageQueues方法,根据Topic名称查询对应的MQ,主动拉取消息
  4. 循环遍历MQ,对于遍历到的每个MQ,取出一条消息

fetchSubscribeMessageQueues

获取所有MQ的方法源码如下,该方法位于

org/apache/rocketmq/client/impl/MQAdminImpl.java

中:

publicSet<MessageQueue>fetchSubscribeMessageQueues(String topic)throwsMQClientException{try{// 从注册中心获取路由信息TopicRouteData topicRouteData =this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);// 如果路由信息不为空则获取路由信息中的队列集合if(topicRouteData !=null){Set<MessageQueue> mqList =MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);if(!mqList.isEmpty()){return mqList;}else{thrownewMQClientException("Can not find Message Queue for this topic, "+ topic +" Namesrv return empty",null);}}}catch(Exception e){thrownewMQClientException("Can not find Message Queue for this topic, "+ topic +FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);}thrownewMQClientException("Unknow why, Can not find Message Queue for this topic, "+ topic,null);}

上述代码首先从注册中心中获取

TopicRouteData

,其中存储了路由信息:

在这里插入图片描述

  • orderTopicConf:顺序消息配置

它的格式为:BrokerName1:QueueId1;BrokerName2:QueueId2;…BrokerNameN:QueueIdN;

  • queueDatas:队列数据数组
  • brokerAddr:Broker数据数组
  • filterServerTable:Broker地址和Filter Server之间的映射

如果拿到的

TopicRouteData

不为空,则提取

TopicRouteData

内的QueueData生成MQ,这个MQ就是当前订阅的

Topic

下的。如果队列集合不为空,就会直接返回。

拉取消息的核心代码

拉取消息的核心方法是

pullSyncImpl

,在这个方法里实现了消息的拉取

privatePullResultpullSyncImpl(MessageQueue mq,SubscriptionData subscriptionData,long offset,int maxNums,boolean block,long timeout)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{this.isRunning();if(null== mq){thrownewMQClientException("mq is null",null);}if(offset <0){thrownewMQClientException("offset < 0",null);}if(maxNums <=0){thrownewMQClientException("maxNums <= 0",null);}this.subscriptionAutomatically(mq.getTopic());int sysFlag =PullSysFlag.buildSysFlag(false, block,true,false);long timeoutMillis = block ?this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend(): timeout;boolean isTagType =ExpressionType.isTagType(subscriptionData.getExpressionType());// 拉取消息PullResult pullResult =this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            subscriptionData.getExpressionType(),
            isTagType ?0L: subscriptionData.getSubVersion(),
            offset,
            maxNums,
            sysFlag,0,this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,CommunicationMode.SYNC,null);// 对消息数据进行处理this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);// 如果namespace不是空的,则重置没有命名空间的Topic。this.resetTopic(pullResult.getMsgFoundList());// 把消息数据设置到上下文对象ConsumeMessageContext里if(!this.consumeMessageHookList.isEmpty()){ConsumeMessageContext consumeMessageContext =null;
            consumeMessageContext =newConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);this.executeHookAfter(consumeMessageContext);}return pullResult;}

本文转载自: https://blog.csdn.net/HNU_Csee_wjw/article/details/127128743
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。

“RocketMQ 消费者拉取消息(Pull) 解析&mdash;&mdash;图解、源码级解析”的评论:

还没有评论