0


RocketMQ 之消息消费手动提交 ACK 实战【案例分享】

前言:

上一篇我们分享了 RocketMQ 消息重试的一些基本原理,本篇我们基于 Spring Boot 整合 RocketMQ 来分享一下 RocketMQ 消息基于手动提交的案例,在分享手动进行消息 ACK 中也会分享消息重试的使用。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

Spring Boot 整合 RocketMQ 之定时/延时消息

Spring Boot 整合 RocketMQ 之顺序消息

Spring Boot 整合 RocketMQ 之事务消息

RocketMQ 之消息重试机制

同步消息手动提交 ACK 案例分享

同步消息 Producer 消息发送代码案例

同步消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下同步消息 Producer 消息发送代码,具体如下:

packagecom.order.service.rocketmq.producer;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.Message;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Component;importjava.util.ArrayList;importjava.util.List;/**
 * @ClassName: OneWayMessageProducer
 * @Author: zhangyong
 * @Date: 2024/9/27 17:27
 * @Description: 同步消息发送者
 */@Slf4j@ComponentpublicclassSyncMessageProducer{@AutowiredprivateRocketMQTemplate rocketMqTemplate;/**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 同步消息发送
     */publicvoidsendSyncMessage(String message){
        rocketMqTemplate.syncSend("sync-topic",MessageBuilder.withPayload(message).build());}}

同步消息手动 ACK Consumer 端代码案例分享

RocketMQ 消息手动 ACK 就不能再使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式来实现了,RocketMQListener 的源码如下:

packageorg.apache.rocketmq.spring.core;publicinterfaceRocketMQListener<T>{voidonMessage(T var1);}

我们可以看到 RocketMQListener 中只提供了一个 onMessage 方法,且返回值为 void,不接受返回值,因此没办法进行手动 ACK。

这里我们使用 DefaultMQPushConsumer 来实现消息的手动 ACK,DefaultMQPushConsumer 实现了 MQPushConsumer 接口,具体实现代码如下:

packagecom.order.service.rocketmq.consumer;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.ObjectUtils;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.*;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.message.MessageExt;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.List;importjava.util.Map;/**
 * @ClassName: ManualCommitSyncCounsumer
 * @Author: Author
 * @Date: 2024/10/16 16:23
 * @Description:
 */@Slf4j@ComponentpublicclassManualCommitSyncCounsumer{/**
     * @date 2024/10/16 17:19
     * @description 同步消息消费成功 手动提交
     */@PostConstructpublicvoidonSyncMessage()throwsMQClientException{//消费者DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("sync-group");//设置最大消息重试次数//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("xxx-xxx-rocketmq.xxx.com:19876");//订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("sync-topic","*");
        consumer.registerMessageListener(newMessageListenerConcurrently(){//存储消息id 和消费次数的关系finalMap<String,Integer> map =newHashMap<>();@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(newDate());//消息处理MessageExt messageExt = list.get(0);String message =newString(messageExt.getBody());String msgId = messageExt.getMsgId();//获取消息消费次数Integer count = map.get(msgId);if(count ==null){
                    count =0;}//次数+1
                count = count +1;//覆盖map
                map.put(msgId, count);if(count >2){
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回消费消费成功,消息内容:{}", dateStr, msgId, count, message);//消息消费成功后移除
                    map.remove(msgId);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息内容:{}", dateStr, msgId, count, message);//模拟除 0 异常//int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息消费成功,消息内容:{}", dateStr, msgId, count, message);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;//return null;}});
        consumer.start();}}

RocketMQ 同步消费端手动 ACK 结果验证:

正常消费返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS

2024-10-1910:23:07.575  INFO 10820---[MessageThread_1]c.o.s.r.c.ManualCommitSyncCounsumer: 当前时间:2024-10-1910:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-1910:23:07.575  INFO 10820---[MessageThread_1]c.o.s.r.c.ManualCommitSyncCounsumer: 当前时间:2024-10-1910:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了

没有模拟除 0 异常,正常消费返回,一次就消费成功,结果符合预期。

模拟除 0 异常

2024-10-19 10:19:59.026  INFO 34052 ---[MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:19:59,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:09.029  INFO 34052 ---[MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:09,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:39.032  INFO 34052 ---[MessageThread_4] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:39,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

正常消费但是返回 NULL

2024-10-19 10:25:47.338  INFO 27256 ---[MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:47.338  INFO 27256 ---[MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 ---[MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 ---[MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:26:27.347  INFO 27256 ---[MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:26:27,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到返回 NULL 和模拟除 0 异常是一样的效果,消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

上面的案例我们没有控制消息消费重试次数,我们可以设置一个消息消费重试次数,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式消费消息的时候消费失败也会自动进行重试,因此我们一定要控制好重试次数,可以 @RocketMQMessageListener 注解的 maxReconsumeTimes 来控制重试次数(高版本的 starter 才有该属性)。

不管使用哪种方式进行消费,对于达到重试次数还是消费失败的消息一般有两种处理方式,分別是:

  • 直接返回消息消费成功,使用本地库记录消息进行重新推送或者人工介入处理。
  • 不进行处理,让消息进入死信队列,然后去监听死信队列进行处理。

顺序消息 Producer 消息发送代码案例

顺序消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下顺序消息 Producer 消息发送代码,具体如下:

packagecom.order.service.rocketmq.producer;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.client.producer.SendCallback;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Component;/**
 * @ClassName: OneWayMessageProducer
 * @Author: Author
 * @Date: 2024/9/27 17:27
 * @Description: 顺序消息发送者
 */@Slf4j@ComponentpublicclassOrderlyMessageProducer{@AutowiredprivateRocketMQTemplate rocketMqTemplate;/**
     * @date 2024/10/11 15:45
     * @description 同步顺序消息
     */publicvoidsyncSendOrderly(){//hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(),"666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(),"666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(),"666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(),"888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(),"888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(),"888888");}}

顺序消息 Consumer 消息消费代码案例

同步消息手动 ACK 我们是注册了一个 MessageListenerConcurrently 消息监听器,顺序消息的手动 ACK 我们需要注册一个 MessageListenerOrderly 的消息监听器,具体代码如下:

packagecom.order.service.rocketmq.consumer;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.*;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.spring.annotation.ConsumeMode;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.List;importjava.util.Map;/**
 * @ClassName: ManualCommitOrderlyMessageConsumer
 * @Author: Author
 * @Date: 2024/10/10 17:35
 * @Description: 顺序消息消费
 */@Slf4j@ComponentpublicclassManualCommitOrderlyMessageConsumer{/**
     * @date 2024/10/16 17:19
     * @description 顺序消息消费成功 手动提交
     */@PostConstructpublicvoidonOrderlyMessage()throwsMQClientException{//消费者DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("orderly-group");//设置最大消息重试次数//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("dev-ztn-rocketmq.eminxing.com:19876");//订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("orderly-topic","*");
        consumer.registerMessageListener(newMessageListenerOrderly(){//存储消息id 和消费次数的关系finalMap<String,Integer> map =newHashMap<>();@OverridepublicConsumeOrderlyStatusconsumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(newDate());//消息处理MessageExt messageExt = list.get(0);String message =newString(messageExt.getBody());String msgId = messageExt.getMsgId();//获取消息消费次数Integer count = map.get(msgId);if(count ==null){
                    count =0;}//次数+1
                count = count +1;//覆盖map
                map.put(msgId, count);if(count >2){
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回顺序消费消费成功,消息内容:{}", dateStr, msgId, count, message);//消息消费成功后移除
                    map.remove(msgId);returnConsumeOrderlyStatus.SUCCESS;}
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息内容:{}", dateStr, msgId, count, message);//模拟除 0 异常//int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息消费成功,消息内容:{}", dateStr, msgId, count, message);returnConsumeOrderlyStatus.SUCCESS;//return null;}});
        consumer.start();}}

RocketMQ 顺序消费端手动 ACK 结果验证:

正常消费返回 ConsumeOrderlyStatus.SUCCESS

2024-10-19 13:46:44.293  INFO 29348 ---[orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.294  INFO 29348 ---[orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.295  INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.295  INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.298  INFO 29348 ---[orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.298  INFO 29348 ---[orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.300  INFO 29348 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.300  INFO 29348 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.302  INFO 29348 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:46:44.303  INFO 29348 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,结果符合预期。

模拟除 0 异常

2024-10-19 13:50:45.587  INFO 27604 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:45,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:46.588  INFO 27604 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:46,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 ---[orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 ---[orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:48.593  INFO 27604 ---[orderly-group_7] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:48,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 ---[orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 ---[orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:50.610  INFO 27604 ---[orderly-group_9] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:50,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 ---[rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 ---[rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:52.617  INFO 27604 ---[rderly-group_11] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:52,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 ---[rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 ---[rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:54.620  INFO 27604 ---[rderly-group_13] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:54,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 ---[rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 ---[rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:56.629  INFO 27604 ---[rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:56,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:57.631  INFO 27604 ---[rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:57,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,结果符合预期。

这里需要注意一点的时候上一条消息没有消费成功时,后面一条消息永远不会被消费,这个从我们的日志中也能够体现出来,因此我们在使用顺序消息的时候一定要注意消息的重试次数,消息重试次数我们可以通过自己的业务来判断消费几次,也可以使用 RocketMQ 来实现,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

模拟返回 NULL

2024-10-19 14:00:51.484  INFO 22728 ---[rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:51,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:52.485  INFO 22728 ---[rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:52,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 ---[rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 ---[rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:54.492  INFO 22728 ---[rderly-group_17] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:54,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 ---[rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 ---[rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:56.495  INFO 22728 ---[rderly-group_19] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:56,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 ---[rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 ---[rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:58.510  INFO 22728 ---[orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:58,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:00.521  INFO 22728 ---[orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:00,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:02.525  INFO 22728 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:02,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:03.527  INFO 22728 ---[orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:03,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,和模拟除 0异常 一样的结果,结果符合预期。

总结:RocketMQ 消息重试次数可以通过 @RocketMQMessageListener 注解的 maxReconsumeTimes 属性来设置,也可以通过编码来设置,不管采用何种方式来设置,我们都要在业务编码中做好处理,过多重视造成的性能问题,以及没有合理处理消费失败的消息造成的消息丢失问题,对于顺序消息我们更要慎重对待,顺序消息回到导致消息阻塞。

如有不正确的地方欢迎各位指出纠正。


本文转载自: https://blog.csdn.net/weixin_42118323/article/details/141649165
版权归原作者 码农爱java 所有, 如有侵权,请联系我们删除。

“RocketMQ 之消息消费手动提交 ACK 实战【案例分享】”的评论:

还没有评论