🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年9月30日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
消费者拉取消息(Pull)示例
消费者使用Pull方式拉取消息的流程和Push消息的流程基本类似,包括创建消费者对象、设置组名、启动消费者消费。代码如下:
packagecom.wjw;importorg.apache.rocketmq.client.consumer.DefaultMQPullConsumer;importorg.apache.rocketmq.client.consumer.PullResult;importorg.apache.rocketmq.common.message.MessageQueue;importjava.util.HashMap;importjava.util.Map;importjava.util.Set;publicclassPullConsumer{// 存储队列offsetprivatestaticfinalMap<MessageQueue,Long> OFFSET_TABLE =newHashMap<>();publicstaticvoidmain(String[] args)throwsException{DefaultMQPullConsumer consumer =newDefaultMQPullConsumer("group A");// 启动消费者
consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Target Topic");for(MessageQueue mq : mqs){System.out.println("Consume message from "+ mq);// 拉取消息PullResult pullResult = consumer.pullBlockIfNotFound(mq,null, OFFSET_TABLE.get(mq),32);System.out.println("pullResult : "+ pullResult);// 设置该MQ的offset
OFFSET_TABLE.put(mq, pullResult.getNextBeginOffset());}
consumer.shutdown();}}
将上面的流程概括一下:
- 创建Pull模式的消费者对象
- 启动消费者消费
- 调用
fetchSubscribeMessageQueues
方法,根据Topic名称查询对应的MQ,主动拉取消息 - 循环遍历MQ,对于遍历到的每个MQ,取出一条消息
fetchSubscribeMessageQueues
获取所有MQ的方法源码如下,该方法位于
org/apache/rocketmq/client/impl/MQAdminImpl.java
中:
publicSet<MessageQueue>fetchSubscribeMessageQueues(String topic)throwsMQClientException{try{// 从注册中心获取路由信息TopicRouteData topicRouteData =this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);// 如果路由信息不为空则获取路由信息中的队列集合if(topicRouteData !=null){Set<MessageQueue> mqList =MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);if(!mqList.isEmpty()){return mqList;}else{thrownewMQClientException("Can not find Message Queue for this topic, "+ topic +" Namesrv return empty",null);}}}catch(Exception e){thrownewMQClientException("Can not find Message Queue for this topic, "+ topic +FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);}thrownewMQClientException("Unknow why, Can not find Message Queue for this topic, "+ topic,null);}
上述代码首先从注册中心中获取
TopicRouteData
,其中存储了路由信息:
orderTopicConf
:顺序消息配置
它的格式为:BrokerName1:QueueId1;BrokerName2:QueueId2;…BrokerNameN:QueueIdN;
queueDatas
:队列数据数组brokerAddr
:Broker数据数组filterServerTable
:Broker地址和Filter Server之间的映射
如果拿到的
TopicRouteData
不为空,则提取
TopicRouteData
内的QueueData生成MQ,这个MQ就是当前订阅的
Topic
下的。如果队列集合不为空,就会直接返回。
拉取消息的核心代码
拉取消息的核心方法是
pullSyncImpl
,在这个方法里实现了消息的拉取
privatePullResultpullSyncImpl(MessageQueue mq,SubscriptionData subscriptionData,long offset,int maxNums,boolean block,long timeout)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{this.isRunning();if(null== mq){thrownewMQClientException("mq is null",null);}if(offset <0){thrownewMQClientException("offset < 0",null);}if(maxNums <=0){thrownewMQClientException("maxNums <= 0",null);}this.subscriptionAutomatically(mq.getTopic());int sysFlag =PullSysFlag.buildSysFlag(false, block,true,false);long timeoutMillis = block ?this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend(): timeout;boolean isTagType =ExpressionType.isTagType(subscriptionData.getExpressionType());// 拉取消息PullResult pullResult =this.pullAPIWrapper.pullKernelImpl(
mq,
subscriptionData.getSubString(),
subscriptionData.getExpressionType(),
isTagType ?0L: subscriptionData.getSubVersion(),
offset,
maxNums,
sysFlag,0,this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,CommunicationMode.SYNC,null);// 对消息数据进行处理this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);// 如果namespace不是空的,则重置没有命名空间的Topic。this.resetTopic(pullResult.getMsgFoundList());// 把消息数据设置到上下文对象ConsumeMessageContext里if(!this.consumeMessageHookList.isEmpty()){ConsumeMessageContext consumeMessageContext =null;
consumeMessageContext =newConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);this.executeHookBefore(consumeMessageContext);
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);this.executeHookAfter(consumeMessageContext);}return pullResult;}
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。