0


RocketMQ&Kafka重试队列

为实现服务间的解耦和部分逻辑的异步处理,我们的系统采纳了消息驱动的方法。通过消息队列的使用,各个服务能够基于事件进行通信,从而降低了直接的依赖关系,优化了系统的响应性能和可靠性。

为什么需要考虑消费重试?

在业务开发过程中,异常处理和重试机制是确保系统稳定性和数据一致性的关键环节。例如,Spring Retry 提供了灵活的重试策略,以增强方法调用的健壮性。同样,在使用 OpenFeign 进行远程服务调用时,其集成的 Ribbon 组件也能够处理接口调用过程中的重试逻辑。

对于消息队列的消费场景,消息消费的重试同样至关重要。当消息处理过程中遇到异常时,合理的重试策略可以确保业务逻辑的正确执行和数据的完整性。

rocketmq与kafka如何进行重试的?

rocketmq(4.9.x版本)

重试队列

若Consumer消费某条消息失败,则RocketMQ会在重试间隔时间后,将消息重新投递给Consumer消费,若达到最大重试次数后消息还没有成功被消费,则消息将被投递至死信队列

消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

  • 最大重试次数:消息消费失败后,可被重复投递的最大次数。
consumer.setMaxReconsumeTimes(10);
  • 重试间隔:消息消费失败后再次被投递给Consumer消费的间隔时间,只在顺序消费中起作用。
consumer.setSuspendCurrentQueueTimeMillis(5000);

顺序消费和并发消费的重试机制并不相同,顺序消费消费失败后会先在客户端本地重试直到最大重试次数,这样可以避免消费失败的消息被跳过,消费下一条消息而打乱顺序消费的顺序,而并发消费消费失败后会将消费失败的消息重新投递回服务端,再等待服务端重新投递回来,在这期间会正常消费队列后面的消息。

并发消费失败后并不是投递回原Topic,而是投递到一个特殊Topic,其命名为%RETRY%ConsumerGroupName,集群模式下并发消费每一个ConsumerGroup会对应一个特殊Topic,并会订阅该Topic。 两者参数差别如下

消费类型重试间隔最大重试次数顺序消费间隔时间可通过自定义设置,SuspendCurrentQueueTimeMillis最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。该参数取值无最大限制。若未设置参数值,默认最大重试次数为Integer.MAX并发消费间隔时间根据重试次数阶梯变化,取值范围:1秒~2小时。不支持自定义配置最大重试次数可通过自定义参数MaxReconsumeTimes取值进行配置。默认值为16次,该参数取值无最大限制,建议使用默认值

死信队列​

当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。

消费者示例

springboot 集成 maven

@Component@RocketMQMessageListener(topic ="topicTest",consumerGroup ="test", selectorExpression ="tagTest",
       consumeThreadNumber =10)@RequiredArgsConstructorpublicstaticclassTestListenerimplementsRocketMQListener<String>{publicvoidonMessage(String event){
       log.info("consume event: {}", event);// 处理业务}}
rocketmq dashboard

重试队列相关topic见下图
在这里插入图片描述
可以看到有个%RETRY%test的topic,即重试topic为%RETRY%+consumerGroup。

源码解析

自定义实现MessageListener消费消息接口,并发消费消息,如下为spring-rocketmq的实现。业务开发只需要实现RocketMQListener即可,见消费者示例。

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
     for (MessageExt messageExt : msgs) {
         log.debug("received msg: {}", messageExt);
         try {
             long now = System.currentTimeMillis();
             handleMessage(messageExt);
             long costTime = System.currentTimeMillis() - now;
             log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
         } catch (Exception e) {
             log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
             context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
         }
     }

     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
private void handleMessage(MessageExt messageExt)  {
     if (rocketMQListener != null) {
         rocketMQListener.onMessage(doConvertMessage(messageExt));
     }
}

根据业务接口返回的状态来决定下一步如何处理

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
// 根据状态,设置ackIndexswitch(status){caseCONSUME_SUCCESS:if(ackIndex >= consumeRequest.getMsgs().size()){
           ackIndex = consumeRequest.getMsgs().size()-1;}int ok = ackIndex +1;int failed = consumeRequest.getMsgs().size()- ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;caseRECONSUME_LATER:
       ackIndex =-1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
           consumeRequest.getMsgs().size());break;default:break;}switch(this.defaultMQPushConsumer.getMessageModel()){caseBROADCASTING:for(int i = ackIndex +1; i < consumeRequest.getMsgs().size(); i++){MessageExt msg = consumeRequest.getMsgs().get(i);// 广播模式仅输出日志
           log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;caseCLUSTERING:// 集群模式下,只要有一条消息消费失败,则遍历本次消费的所有消息,发回broker// consumeRequest.getMsgs().size() 由ConsumeMessageBatchMaxSize决定,默认为1List<MessageExt> msgBackFailed =newArrayList<MessageExt>(consumeRequest.getMsgs().size());// ackidx = -1, i从0开始,遍历所有消息for(int i = ackIndex +1; i < consumeRequest.getMsgs().size(); i++){MessageExt msg = consumeRequest.getMsgs().get(i);boolean result =this.sendMessageBack(msg, context);if(!result){
               msg.setReconsumeTimes(msg.getReconsumeTimes()+1);
               msgBackFailed.add(msg);}}if(!msgBackFailed.isEmpty()){
           consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}
sendMsgBack 发回broker
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
    int delayLevel = context.getDelayLevelWhenNextConsume();
    // Wrap topic with namespace before sending back message.
    msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
    try {
        this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
        return true;
    } catch (Exception e) {
        log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg, e);
    }

    return false;
}

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack

publicvoidsendMessageBack(MessageExt msg,int delayLevel,finalString brokerName){try{String brokerAddr =(null!= brokerName)?this.mQClientFactory.findBrokerAddressInPublish(brokerName):RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg,this.defaultMQPushConsumer.getConsumerGroup(), delayLevel,5000,getMaxReconsumeTimes());}catch(Exception e){
        log.error("sendMessageBack Exception, "+this.defaultMQPushConsumer.getConsumerGroup(), e);Message newMsg =newMessage(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());String originMsgId =MessageAccessor.getOriginMessageId(msg);MessageAccessor.setOriginMessageId(newMsg,UtilAll.isBlank(originMsgId)? msg.getMsgId(): originMsgId);

        newMsg.setFlag(msg.getFlag());MessageAccessor.setProperties(newMsg, msg.getProperties());MessageAccessor.putProperty(newMsg,MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());MessageAccessor.setReconsumeTime(newMsg,String.valueOf(msg.getReconsumeTimes()+1));MessageAccessor.setMaxReconsumeTimes(newMsg,String.valueOf(getMaxReconsumeTimes()));MessageAccessor.clearProperty(newMsg,MessageConst.PROPERTY_TRANSACTION_PREPARED);
        newMsg.setDelayTimeLevel(3+ msg.getReconsumeTimes());this.mQClientFactory.getDefaultMQProducer().send(newMsg);}finally{
        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),this.defaultMQPushConsumer.getNamespace()));}}

org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack

ConsumerSendMsgBackRequestHeader requestHeader =newConsumerSendMsgBackRequestHeader();RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
requestHeader.setBname(brokerName);RemotingCommand response =this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
    request, timeoutMillis);assert response !=null;switch(response.getCode()){caseResponseCode.SUCCESS:{return;}default:break;}

4.9.x 最终还是通过netty发送到broker的
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync

5.x 通过grpc通信,取代了remoting的通信方式;通过proxy访问,而不是直接链接到broker;

broker端schedule处理
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request) t {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final ConsumerSendMsgBackRequestHeader requestHeader =
          (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
    ……
    // 读写权限校验等;
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
        return CompletableFuture.completedFuture(response);
    }
    if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return CompletableFuture.completedFuture(response);
    }

    // 重试topic
    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
    int topicSysFlag = 0;
    if (requestHeader.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
    }

    // 创建重试topic,并设置读写权限
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return CompletableFuture.completedFuture(response);
    }

    if (!PermName.isWriteable(topicConfig.getPerm())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
        return CompletableFuture.completedFuture(response);
    }
    MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
    if (null == msgExt) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("look message by offset failed, " + requestHeader.getOffset());
        return CompletableFuture.completedFuture(response);
    }

    final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
    if (null == retryTopic) {
        MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
    }
    msgExt.setWaitStoreMsgOK(false);

    int delayLevel = requestHeader.getDelayLevel();

    int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
        Integer times = requestHeader.getMaxReconsumeTimes();
        if (times != null) {
            maxReconsumeTimes = times;
        }
    }

    // 根据重试次数来判断是发送到死信队列或重试队列
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
        || delayLevel < 0) {
        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
        queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;

        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                DLQ_NUMS_PER_GROUP,
                PermName.PERM_WRITE | PermName.PERM_READ, 0);

        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return CompletableFuture.completedFuture(response);
        }
        msgExt.setDelayTimeLevel(0);
    } else {
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);
    }

    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

    msgInner.setQueueId(queueIdInt);
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(msgExt.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

    String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
    MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

    // 消息持久化
    CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    return putMessageResult.thenApply(r -> {
        if (r != null) {
            switch (r.getPutMessageStatus()) {
                case PUT_OK:
                    String backTopic = msgExt.getTopic();
                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                    if (correctTopic != null) {
                        backTopic = correctTopic;
                    }
                    if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
                        this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
                        this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), r.getAppendMessageResult().getWroteBytes());
                        this.brokerController.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
                        this.brokerController.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), r.getAppendMessageResult().getWroteBytes());
                    }
                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                    response.setCode(ResponseCode.SUCCESS);
                    response.setRemark(null);
                    return response;
                default:
                    break;
            }
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(r.getPutMessageStatus().name());
            return response;
        }
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("putMessageResult is null");
        return response;
    });
}

org.apache.rocketmq.store.MessageStore#asyncPutMessage
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
org.apache.rocketmq.store.CommitLog#asyncPutMessage

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
//        int queueId msg.getQueueId();
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 持久化到SCHEDULE_TOPIC_XXXX 中,queueId 根据delayLevel转换
            // org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 将真实的topic、queueId 保存到property中
            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
    if (bornSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setBornHostV6Flag();
    }

    InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
    if (storeSocketAddress.getAddress() instanceof Inet6Address) {
        msg.setStoreHostAddressV6Flag();
    }

    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    updateMaxMessageSize(putMessageThreadLocal);
    if (!multiDispatch.isMultiDispatchMsg(msg)) {
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
    }
    PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
    try {
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        msg.setStoreTimestamp(beginLockTimestamp);

        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
        // 将消息持久化
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                }
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
            case UNKNOWN_ERROR:
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            default:
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
        }

        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    } finally {
        beginTimeInLock = 0;
        putMessageLock.unlock();
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }
    ……
}

消费commitLog中的消息

org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

publicvoidexecuteOnTimeup(){// comsumeQueue 不存在则创建ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));if(cq ==null){this.scheduleNextTimerTask(this.offset,DELAY_FOR_A_WHILE);return;}SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);if(bufferCQ ==null){long resetOffset;if((resetOffset = cq.getMinOffsetInQueue())>this.offset){
            log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());}elseif((resetOffset = cq.getMaxOffsetInQueue())<this.offset){
            log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",this.offset, resetOffset, cq.getQueueId());}else{
            resetOffset =this.offset;}this.scheduleNextTimerTask(resetOffset,DELAY_FOR_A_WHILE);return;}long nextOffset =this.offset;try{int i =0;ConsumeQueueExt.CqExtUnit cqExtUnit =newConsumeQueueExt.CqExtUnit();for(; i < bufferCQ.getSize()&&isStarted(); i +=ConsumeQueue.CQ_STORE_UNIT_SIZE){long offsetPy = bufferCQ.getByteBuffer().getLong();int sizePy = bufferCQ.getByteBuffer().getInt();long tagsCode = bufferCQ.getByteBuffer().getLong();if(cq.isExtAddr(tagsCode)){if(cq.getExt(tagsCode, cqExtUnit)){
                    tagsCode = cqExtUnit.getTagsCode();}else{//can't find ext content.So re compute tags code.
                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                        tagsCode, offsetPy, sizePy);long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                    tagsCode =computeDeliverTimestamp(delayLevel, msgStoreTime);}}long now =System.currentTimeMillis();long deliverTimestamp =this.correctDeliverTimestamp(now, tagsCode);
            nextOffset = offset +(i /ConsumeQueue.CQ_STORE_UNIT_SIZE);long countdown = deliverTimestamp - now;if(countdown >0){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);return;}// 根据offset查找消息MessageExt msgExt =ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);if(msgExt ==null){continue;}MessageExtBrokerInner msgInner =ScheduleMessageService.this.messageTimeup(msgExt);if(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())){
                log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                    msgInner.getTopic(), msgInner);continue;}boolean deliverSuc;// 投递消息到重试队列if(ScheduleMessageService.this.enableAsyncDeliver){
                deliverSuc =this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);}else{
                deliverSuc =this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);}if(!deliverSuc){this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);return;}}

        nextOffset =this.offset +(i /ConsumeQueue.CQ_STORE_UNIT_SIZE);}catch(Exception e){
        log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);}finally{
        bufferCQ.release();}this.scheduleNextTimerTask(nextOffset,DELAY_FOR_A_WHILE);}privatebooleansyncDeliver(MessageExtBrokerInner msgInner,String msgId,long offset,long offsetPy,int sizePy){PutResultProcess resultProcess =deliverMessage(msgInner, msgId, offset, offsetPy, sizePy,false);PutMessageResult result = resultProcess.get();boolean sendStatus = result !=null&& result.getPutMessageStatus()==PutMessageStatus.PUT_OK;if(sendStatus){ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());}return sendStatus;}// messageStore 异步持久化消息-->commitLog-->mappedFileprivatePutResultProcessdeliverMessage(MessageExtBrokerInner msgInner,String msgId,long offset,long offsetPy,int sizePy,boolean autoResend){CompletableFuture<PutMessageResult> future =ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);returnnewPutResultProcess().setTopic(msgInner.getTopic()).setDelayLevel(this.delayLevel).setOffset(offset).setPhysicOffset(offsetPy).setPhysicSize(sizePy).setMsgId(msgId).setAutoResend(autoResend).setFuture(future).thenProcess();}
订阅重试队列

消息经过指定延迟时间被投递到延迟队列后,就可以被消费者消费了。

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription

switch(this.defaultMQPushConsumer.getMessageModel()){caseBROADCASTING:break;// 集群模式才会去订阅重试队列caseCLUSTERING:finalString retryTopic =MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData =FilterAPI.buildSubscriptionData(retryTopic,SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}
消息的拉取
topic的重新设置
publicvoidresetRetryAndNamespace(finalList<MessageExt> msgs,String consumerGroup){finalString groupTopic =MixAll.getRetryTopic(consumerGroup);for(MessageExt msg : msgs){String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);if(retryTopic !=null&& groupTopic.equals(msg.getTopic())){
            msg.setTopic(retryTopic);}if(StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())){
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),this.defaultMQPushConsumer.getNamespace()));}}}

重新处理业务逻辑
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage

总结

重试的主要流程:
1、consumer消费失败,将消息发送回broker;
2、broker收到重试消息之后,先存储到定时队列里;
3、根据重试次数,经过一定延迟时间后,重新投递到retryTopic;
4、consumer会拉取consumerGroup对应的retryTopic的消息;
5、consumer拉取到retryTopic消息之后,转换为原始的topic,由messageListener实现类消费。

QA

1、为什么不是发送到原队列?
因为可能有多个消费者组消费了同一条消息,而只有某一组报错了。发到原队列,则可能会导致其它消费者组重复订阅。

2、重试队列数量,即consumerGroup数量在哪些方面可能产生影响?
a)、broker向nameserver注册时,会注册topic信息,而consumerGroup会增加重试topic;
b)、每个并发消费的consumer都会启动指定核心线程数量的线程池;
c)、订阅关系一致;
d)、客户端重平衡;

kafka重试

kafka原生支持程度

kafka本身并不支持重试、延迟、死信队列等特性。
spring-kafka 封装了重试队列,每个topic都会产生对应的重试topic。而kafka topic数量如果过多,则会影响到kafka的吞吐量。

自定义实现重试队列逻辑

1、实现KafkaListenerErrorHandler,重写handleError方法,在消费失败后为消息header增加retryCount,记录失败的组id,并根据retryCount来判断发送到retry topic 或 dead topic;

@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception,Consumer<?,?> consumer){
    log.warn("消费消息失败,将进行重试或进入死信", exception);//重试次数ConsumerRecord record =(ConsumerRecord) message.getPayload();//读取自定义的请求头;如果有,读取重试过的次数;Header retryHeader = record.headers().lastHeader("__retry_dead_head__");Integer retryTimes =0;if(retryHeader !=null&& retryHeader.value()!=null){
        retryTimes =JsonUtils.toData(stringDeserializer.deserialize(null, retryHeader.value()),Map.class).get(RETRY_KEY);;}Headers headers = record.headers();String groupId = consumer.groupMetadata().groupId();Map<String,Object> custData =newHashMap<>();// 处理自定义请求头,比如原topic、groupId等等;assembleCustData(custData );if(hasRetry >= maxRetryCount){message2Dead(record, custData, headers);}else{// 未超过重试次数,仍发送到重试队列里message2Retry(record, hasRetry, custData, headers);}//提交消费点
    consumer.commitSync(Collections.singletonMap(newTopicPartition(record.topic(), record.partition()),newOffsetAndMetadata(record.offset()+1)));returnnull;}privatevoidmessage2Retry(ConsumerRecord record,Integer retryTimes,Map<String,Object> custData,Headers headers){
    custData.put(RETRY_KEY, hasRetry +1);try{// 定义一个统一的retry topic
        kafkaTemplate.send(newProducerRecord<>(RETRY_TOPIC,null,null, record.key(), record.value(),buildNewHeaders(headers, custData))).get();}catch(Exception ex){thrownewBusinessException("发送到死信队列异常", ex);}}

1.1、在消费者注入retryHandler

@BeanpublicKafkaListenerErrorHandlerkafkaListenerErrorHandler(){Map<String,Object> produceConfig = kafkaProperties.buildProducerProperties();
    produceConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());ProducerFactory producerFactory =newDefaultKafkaProducerFactory(produceConfig);KafkaTemplate kafkaTemplate =newKafkaTemplate(producerFactory);RetryListenerErrorHandler kafkaListenerErrorHandler =newRetryListenerErrorHandler(kafkaTemplate, appName);return kafkaListenerErrorHandler;}

2、监听 RETRY_TOPIC

@KafkaListener(topics ={RETRY_TOPIC})publicvoidretryDeal(ConsumerRecord<byte[],byte[]> record,Acknowledgment ack)throwsExecutionException,InterruptedException,UnsupportedEncodingException{Header customerHead = record.headers().lastHeader(RetryListenerErrorHandler.CUSTOMER_HEAD_NAME);Long timestamp = record.timestamp();if(System.currentTimeMillis()< timestamp +RETRY_DELAY_DURATION.toMillis()){try{Thread.sleep((timestamp +RETRY_DELAY_DURATION.toMillis())-System.currentTimeMillis());}catch(InterruptedException e){
            e.printStackTrace();}}Map<String,Object> custData =JsonUtils.toData(newStringDeserializer().deserialize(null, customerHead.value()),Map.class);String topic =(String) custData.get(RetryListenerErrorHandler.CONSUMER_TOPIC);Integer partition =(Integer) custData.get(RetryListenerErrorHandler.PARTITION_KEY);// 在达到延迟时间后,发送到原队列原partition
    kafkaTemplate.send(newProducerRecord<>(topic, partition,null, record.key(), record.value(), record.headers())).get();
    ack.acknowledge();}

3、实现org.apache.kafka.clients.consumer.ConsumerInterceptor,重写onConsume方法

@OverridepublicConsumerRecords<K,V>onConsume(ConsumerRecords<K,V> records){KafkaUtils.getConsumerGroupId();Map<TopicPartition,List<ConsumerRecord<K,V>>> resultRecordMap =newHashMap<>(records.count());
    records.partitions().forEach(p->{List<ConsumerRecord<K,V>> recordList = records.records(p);// 根据自定义过滤逻辑(如groupId)来判断是否消费该消息List<ConsumerRecord<K,V>> filteredRecords = recordList.stream().filter(this::predict).collect(Collectors.toList());if(!filteredRecords.isEmpty()){
            resultRecordMap.put(p, filteredRecords);}});returnnewConsumerRecords<>(resultRecordMap);}

参考

https://rocketmq.apache.org/zh/docs/4.x/consumer/02push
https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy

标签: rocketmq kafka

本文转载自: https://blog.csdn.net/grant167/article/details/141305415
版权归原作者 grant167 所有, 如有侵权,请联系我们删除。

“RocketMQ&Kafka重试队列”的评论:

还没有评论