🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年10月24日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
异常消息处理
使用
Consumer时
会注册
MessageListener
,消费消息的接口会返回处理状态:
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
:消费成功ConsumeConcurrentlyStatus.REConsume_LATER
:等待一段时间后再消费
MessageListener
在
CnsumeMessageConcurrentlyService
中被调用的,上面两个状态会映射到
CMResult
定义的枚举值:
CMResult.CR_SUCCESS
:消费成功CMResult.CR_LATER
:等待一段时间后再消费CMResult.CR_ROLLBACK
:事务回滚CMResult.CR_COMMIT
:事务提交CMResult.CR_THROW_EXCEPTION
:消费异常CMResult.CR_RETURN_NULL
:消费结果为null
针对
CMResult.CR_LATER
状态的处理策略为:将该消息发挥Broker,继续等待后续消息。发送回的消息会设置重试的
Topic
,命名规则为:“%RETRY%” + Consumer组名,消息原本的
Topic
会暂存到消息体中,并且会额外设置
delayLevel
及
reconsumeTimes
消息消费的结果会在
CnsumeMessageConcurrentlyService.processConsumeResult
中处理
publicvoidprocessConsumeResult(finalConsumeConcurrentlyStatus status,finalConsumeConcurrentlyContext context,finalConsumeRequest consumeRequest
){int ackIndex = context.getAckIndex();// 消息为空,直接返回if(consumeRequest.getMsgs().isEmpty())return;// 计算从consumerequest.msg[0]到consumerequest.msgs[ackIndex]的消息消费成功的数量switch(status){case CONSUME_SUCCESS:if(ackIndex >= consumeRequest.getMsgs().size()){
ackIndex = consumeRequest.getMsgs().size()-1;}// 统计成功/失败数量int ok = ackIndex +1;int failed = consumeRequest.getMsgs().size()- ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:
ackIndex =-1;// 统计失败数量this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());break;default:break;}// 处理消息失败的消息switch(this.defaultMQPushConsumer.getMessageModel()){// 如果是广播模式,无论是否消费失败,都不回发消息给Broker,只打印Logcase BROADCASTING:for(int i = ackIndex +1; i < consumeRequest.getMsgs().size(); i++){MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;// 发回失败消息到Brokercase CLUSTERING:List<MessageExt> msgBackFailed =newArrayList<MessageExt>(consumeRequest.getMsgs().size());for(int i = ackIndex +1; i < consumeRequest.getMsgs().size(); i++){MessageExt msg = consumeRequest.getMsgs().get(i);// 回发给Broker的的具体方法boolean result =this.sendMessageBack(msg, context);if(!result){// 重复消费次数 + 1
msg.setReconsumeTimes(msg.getReconsumeTimes()+1);
msgBackFailed.add(msg);}}// 如果回发给Broker也失败的话,则提交延迟消费请求(稍后在客户端重新消费)if(!msgBackFailed.isEmpty()){
consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}// 移除消费成功消息,并返回消费的最新进度long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());// 更新最新消费进度,进度更新只能增长 if(offset >=0&&!consumeRequest.getProcessQueue().isDropped()){this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset,true);}}
Consumer
消费的时候可以设置
consumeMessageBatchMaxSize
来控制传入
MessageListener
的消息数量。RocketMQ认为只要有一条消息消费失败,这一批消息都会发回给Broker,所以设置
consumeMessageBatchMaxSize
这个值时应当注意避免出现消息重复消费的问题。
Broker处理流程
Broker端对应的处理逻辑在
SendMessageProcessor.consumerSendMsgBack
里,对于Consumer发送失败返的消息,Broker会将其放入重试
Topic
中
/**
* 消费者将消息发回给Broker,可以指定多久后重新消费该消息
*
* @param ctx
* @param request
* @return
* @throws RemotingCommandException
*/privateCompletableFuture<RemotingCommand>asyncConsumerSendMsgBack(ChannelHandlerContext ctx,RemotingCommand request)throwsRemotingCommandException{// 初始化响应finalRemotingCommand response =RemotingCommand.createResponseCommand(null);finalConsumerSendMsgBackRequestHeader requestHeader =(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);String namespace =NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());if(this.hasConsumeMessageHook()&&!UtilAll.isBlank(requestHeader.getOriginMsgId())){ConsumeMessageContext context =buildConsumeMessageContext(namespace, requestHeader, request);this.executeConsumeMessageHookAfter(context);}SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());if(null== subscriptionGroupConfig){
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, "+ requestHeader.getGroup()+" "+FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));returnCompletableFuture.completedFuture(response);}// 检查Broker是否有写入权限if(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())){
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker["+this.brokerController.getBrokerConfig().getBrokerIP1()+"] sending message is forbidden");returnCompletableFuture.completedFuture(response);}// 检查重试队列个数是否大于0if(subscriptionGroupConfig.getRetryQueueNums()<=0){
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);returnCompletableFuture.completedFuture(response);}// 计算retry Topic = "%RETRY% + consumeGroup"String newTopic =MixAll.getRetryTopic(requestHeader.getGroup());// 计算队列编号int queueIdInt =Math.abs(this.random.nextInt()%99999999)% subscriptionGroupConfig.getRetryQueueNums();int topicSysFlag =0;if(requestHeader.isUnitMode()){
topicSysFlag =TopicSysFlag.buildSysFlag(false,true);}// 获取topicConfig,如果获取不到,则在response里进行相应设置TopicConfig topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE |PermName.PERM_READ, topicSysFlag);if(null== topicConfig){
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic["+ newTopic +"] not exist");returnCompletableFuture.completedFuture(response);}// 不允许写入if(!PermName.isWriteable(topicConfig.getPerm())){
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));returnCompletableFuture.completedFuture(response);}// 根据消息的commitLog Offset查询实际的MessageExt(消费失败的实际消息)MessageExt msgExt =this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());if(null== msgExt){
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, "+ requestHeader.getOffset());returnCompletableFuture.completedFuture(response);}// 设置 PROPERTY_RETRY_TOPIC = 原始消息的Topic,msgInner通过setProperties()方法将原始消息的Properties拷贝过去finalString retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if(null== retryTopic){MessageAccessor.putProperty(msgExt,MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());}// 设置消息不等待存储完成
msgExt.setWaitStoreMsgOK(false);int delayLevel = requestHeader.getDelayLevel();int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();// 3.4.9版本之后可以支持自定义消息的最大消费次数,若未指定,默认为16if(request.getVersion()>=MQVersion.Version.V3_4_9.ordinal()){Integer times = requestHeader.getMaxReconsumeTimes();if(times !=null){
maxReconsumeTimes = times;}}// 如果超过最大消费次数或delayLevel < 0,则加入私信队列if(msgExt.getReconsumeTimes()>= maxReconsumeTimes
|| delayLevel <0){
newTopic =MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt =Math.abs(this.random.nextInt()%99999999)% DLQ_NUMS_PER_GROUP;
topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE,0);if(null== topicConfig){
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic["+ newTopic +"] not exist");returnCompletableFuture.completedFuture(response);}}else{if(0== delayLevel){// 设置延迟级别为重试消费次数 + 3
delayLevel =3+ msgExt.getReconsumeTimes();}
msgExt.setDelayTimeLevel(delayLevel);}// 创建MessageExtBrokerInner,除了Topic、QueueId不同外,其他的都是拷贝原始消息的数据MessageExtBrokerInner msgInner =newMessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(msgExt.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes()+1);String originMsgId =MessageAccessor.getOriginMessageId(msgExt);MessageAccessor.setOriginMessageId(msgInner,UtilAll.isBlank(originMsgId)? msgExt.getMsgId(): originMsgId);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));// 发送消息CompletableFuture<PutMessageResult> putMessageResult =this.brokerController.getMessageStore().asyncPutMessage(msgInner);return putMessageResult.thenApply((r)->{if(r !=null){switch(r.getPutMessageStatus()){case PUT_OK:String backTopic = msgExt.getTopic();String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if(correctTopic !=null){
backTopic = correctTopic;}this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);return response;default:break;}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());return response;}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");return response;});}
重试消息的重新投递逻辑与延迟消息一致,等待
DelayLevel
对应的延时之后,Broker会尝试重新进行消息投递。
关于延迟级别的的配置在
MessageStoreConfig.messageDelay
里,默认配置如下:
this.messageDelayLevel ="1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
可以主动调整每一个延迟级别对应的时间,但仍然有一些缺陷:
- 时间精度不够细,最小粒度是1s
- 延迟级别的个数是固定的,无法调整
死信队列
RocketMQ里的消息不能无限次重复消费,当重试次数超过所有延迟级别的个数之后,消息就会进入到死信队列里,死信的Topic命名规则为:"%DLQ% " + Consumer组名
// 如果超过最大消费次数或delayLevel < 0,则加入私信队列if(msgExt.getReconsumeTimes()>= maxReconsumeTimes
|| delayLevel <0){
newTopic =MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt =Math.abs(this.random.nextInt()%99999999)% DLQ_NUMS_PER_GROUP;
topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE,0);
进入到死信队列后的消息就不会再被投递了,可以通过接口来查询当前RocketMQ中的死信队列的消息。
版权归原作者 小王曾是少年 所有, 如有侵权,请联系我们删除。