PushConsumer消费模式-广播模式
广播消费: 当使用广播消费模式时, 消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端, 保证消息至少被每台机器消费一次。
相比于集群模式,广播模式的特点为: 每个消费者都会消费所订阅的Topic + Tag下的所有queue中的所有消息。
适用场景&注意事项:
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 每条消息都需要被相同逻辑的多台机器处理。
- 广播模式下, 消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次, 但是并不会对消费失败的消息进行失败重投, 因此业务方需要关注消费失败的情况。
- 广播模式下, 客户端每一次重启都会从最新消息消费。 客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择。
- 广播模式下, 每条消息都会被大量的客户端重复处理, 因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播模式下服务端不维护消费进度, 所以消息队列 RocketMQ 控制台不支持消息堆积查询、 消息堆积报警和订阅关系查询功能。
- 消费进度在客户端维护, 出现消息重复消费的概率稍大于集群模式。
设置成广播模式相关代码如下:
//设置消费模式为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
至少一次设计理念
在集群模式下,RocketMQ 可以保证Topic + Tag下的消息可以肯定会被整个集群至少消费一次。
在广播模式下,RocketMQ 可以保证至少被每台机器消费一次。
类似于数据库的事务操作,消费者未消费完成不返回ack给RocketMQ。官方对于至少一次的解释如下:
官方地址:https://github.com/apache/rocketmq/blob/master/docs/cn/features.md
消费过程幂等
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
消息存储核心-偏移量Offset
Offset指某个topic下的一条消息在某个MessageQueue里的位置,通过Offset可以进行定位到这条消息。Offset是消息消费进度的核心。
- message queue 是无限长的数组,一条消息进来下标就会加1,下标就是 offset,消息在某个 MessageQueue 里的位置,通过 offset 的值可以定位到这条消息,或者指示 Consumer 从这条消息开始向后处理。
- message queue 中的 maxOffset 表示消息的最大 offset,maxOffset 并不是最新的那条消息的 offset,而是最新消息的 offset+1,minOffset 则是现存在的最小 offset。
- fileReserveTime=48 默认消息存储48小时后,消费会被物理地从磁盘删除,message queue 的 minOffset 也就对应增长。所以比 minOffset 还要小的那些消息已经不在 broker上了,就无法被消费。
Offset的存储实现分为远程文件类型和本地文件类型两种方式。
集群模式-RemoteBrokerOffsetStore解析
默认集群模式clustering,采用远程文件存储Offset。
本质上因为多消费模式,每个Consumer消费所订阅主题的一部分
这种情况需要broker控制offset的值,使用RemoteBrokerOffsetStore。
广播模式-LocalFileOffsetStore解析
- 广播模式下,由于每个Consumer都会收到消息且消费
- 各个Consumer之间没有任何干扰,独立线程消费
- 所以使用LocalFileOffsetStore,也就是把Offset存储到本地
RocketMQ消费者拉取模式-PullConsumer使用
Pull方式主要做了三件事:
- 获取Message Queue并遍历
- 维护OffsetStore
- 根据不同的消息状态做不同的处理
代码案例如下:
DefaultMQPullConsumer拉取:
packagecom.zjq.rocketmq.consumer.pull;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Set;importorg.apache.rocketmq.client.consumer.DefaultMQPullConsumer;importorg.apache.rocketmq.client.consumer.PullResult;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.common.message.MessageQueue;importcom.zjq.rocketmq.constants.Const;publicclassPullConsumer{//Map<key, value> key为指定的队列,value为这个队列拉取数据的最后位置//实际中可以放在redis里面或者持久化记录消费的位置privatestaticfinalMap<MessageQueue,Long> offseTable =newHashMap<MessageQueue,Long>();publicstaticvoidmain(String[] args)throwsMQClientException{String group_name ="test_pull_consumer_name";DefaultMQPullConsumer consumer =newDefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.start();System.err.println("consumer start");// 从TopicTest这个主题去获取所有的队列(默认会有4个队列)Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");// 遍历每一个队列,进行拉取数据for(MessageQueue mq : mqs){System.out.println("Consume from the queue: "+ mq);
SINGLE_MQ:while(true){try{// 从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录PullResult pullResult = consumer.pullBlockIfNotFound(mq,null,getMessageQueueOffset(mq),32);System.out.println(pullResult);System.out.println(pullResult.getPullStatus());System.out.println();putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch(pullResult.getPullStatus()){case FOUND:List<MessageExt> list = pullResult.getMsgFoundList();for(MessageExt msg : list){System.out.println(newString(msg.getBody()));}break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:System.out.println("没有新的数据啦...");break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch(Exception e){
e.printStackTrace();}}}
consumer.shutdown();}privatestaticvoidputMessageQueueOffset(MessageQueue mq,long offset){
offseTable.put(mq, offset);}privatestaticlonggetMessageQueueOffset(MessageQueue mq){Long offset = offseTable.get(mq);if(offset !=null)return offset;return0;}}
MQPullConsumerScheduleService定时拉取:
packagecom.zjq.rocketmq.consumer.pull;importjava.util.List;importorg.apache.rocketmq.client.consumer.MQPullConsumer;importorg.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;importorg.apache.rocketmq.client.consumer.PullResult;importorg.apache.rocketmq.client.consumer.PullTaskCallback;importorg.apache.rocketmq.client.consumer.PullTaskContext;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.common.message.MessageQueue;importorg.apache.rocketmq.common.protocol.heartbeat.MessageModel;importcom.zjq.rocketmq.constants.Const;publicclassPullScheduleService{publicstaticvoidmain(String[] args)throwsMQClientException{String group_name ="test_pull_consumer_name";finalMQPullConsumerScheduleService scheduleService =newMQPullConsumerScheduleService(group_name);
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("test_pull_topic",newPullTaskCallback(){@OverridepublicvoiddoPullTask(MessageQueue mq,PullTaskContext context){MQPullConsumer consumer = context.getPullConsumer();System.err.println("-------------- queueId: "+ mq.getQueueId()+"-------------");try{// 获取从哪里拉取long offset = consumer.fetchConsumeOffset(mq,false);if(offset <0)
offset =0;PullResult pullResult = consumer.pull(mq,"*", offset,32);switch(pullResult.getPullStatus()){case FOUND:List<MessageExt> list = pullResult.getMsgFoundList();for(MessageExt msg : list){//消费数据...System.out.println(newString(msg.getBody()));}break;case NO_MATCHED_MSG:break;case NO_NEW_MSG:case OFFSET_ILLEGAL:break;default:break;}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());// 设置再过3000ms后重新拉取
context.setPullNextDelayTimeMillis(3000);}catch(Exception e){
e.printStackTrace();}}});
scheduleService.start();}}
参考:
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
本文内容到此结束了,
如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位大佬指出。
主页:共饮一杯无的博客汇总👨💻保持热爱,奔赴下一场山海。🏃🏃🏃
版权归原作者 共饮一杯无 所有, 如有侵权,请联系我们删除。