0


RocketMQ 之消息消费手动提交 ACK 实战【案例分享】

前言:

上一篇我们分享了 RocketMQ 消息重试的一些基本原理,本篇我们基于 Spring Boot 整合 RocketMQ 来分享一下 RocketMQ 消息基于手动提交的案例,在分享手动进行消息 ACK 中也会分享消息重试的使用。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

Spring Boot 整合 RocketMQ 之定时/延时消息

Spring Boot 整合 RocketMQ 之顺序消息

Spring Boot 整合 RocketMQ 之事务消息

RocketMQ 之消息重试机制

同步消息手动提交 ACK 案例分享

同步消息 Producer 消息发送代码案例

同步消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下同步消息 Producer 消息发送代码,具体如下:

  1. packagecom.order.service.rocketmq.producer;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.Message;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Component;importjava.util.ArrayList;importjava.util.List;/**
  2. * @ClassName: OneWayMessageProducer
  3. * @Author: zhangyong
  4. * @Date: 2024/9/27 17:27
  5. * @Description: 同步消息发送者
  6. */@Slf4j@ComponentpublicclassSyncMessageProducer{@AutowiredprivateRocketMQTemplate rocketMqTemplate;/**
  7. * @param message:
  8. * @date 2024/10/10 17:47
  9. * @description 同步消息发送
  10. */publicvoidsendSyncMessage(String message){
  11. rocketMqTemplate.syncSend("sync-topic",MessageBuilder.withPayload(message).build());}}

同步消息手动 ACK Consumer 端代码案例分享

RocketMQ 消息手动 ACK 就不能再使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式来实现了,RocketMQListener 的源码如下:

  1. packageorg.apache.rocketmq.spring.core;publicinterfaceRocketMQListener<T>{voidonMessage(T var1);}

我们可以看到 RocketMQListener 中只提供了一个 onMessage 方法,且返回值为 void,不接受返回值,因此没办法进行手动 ACK。

这里我们使用 DefaultMQPushConsumer 来实现消息的手动 ACK,DefaultMQPushConsumer 实现了 MQPushConsumer 接口,具体实现代码如下:

  1. packagecom.order.service.rocketmq.consumer;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.ObjectUtils;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.*;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.message.MessageExt;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.List;importjava.util.Map;/**
  2. * @ClassName: ManualCommitSyncCounsumer
  3. * @Author: Author
  4. * @Date: 2024/10/16 16:23
  5. * @Description:
  6. */@Slf4j@ComponentpublicclassManualCommitSyncCounsumer{/**
  7. * @date 2024/10/16 17:19
  8. * @description 同步消息消费成功 手动提交
  9. */@PostConstructpublicvoidonSyncMessage()throwsMQClientException{//消费者DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("sync-group");//设置最大消息重试次数//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件
  10. consumer.setNamesrvAddr("xxx-xxx-rocketmq.xxx.com:19876");//订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
  11. consumer.subscribe("sync-topic","*");
  12. consumer.registerMessageListener(newMessageListenerConcurrently(){//存储消息id 和消费次数的关系finalMap<String,Integer> map =newHashMap<>();@OverridepublicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(newDate());//消息处理MessageExt messageExt = list.get(0);String message =newString(messageExt.getBody());String msgId = messageExt.getMsgId();//获取消息消费次数Integer count = map.get(msgId);if(count ==null){
  13. count =0;}//次数+1
  14. count = count +1;//覆盖map
  15. map.put(msgId, count);if(count >2){
  16. log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回消费消费成功,消息内容:{}", dateStr, msgId, count, message);//消息消费成功后移除
  17. map.remove(msgId);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
  18. log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息内容:{}", dateStr, msgId, count, message);//模拟除 0 异常//int a = 1 / 0;
  19. log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息消费成功,消息内容:{}", dateStr, msgId, count, message);returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;//return null;}});
  20. consumer.start();}}

RocketMQ 同步消费端手动 ACK 结果验证:

正常消费返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS

  1. 2024-10-1910:23:07.575 INFO 10820---[MessageThread_1]c.o.s.r.c.ManualCommitSyncCounsumer: 当前时间:2024-10-1910:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
  2. 2024-10-1910:23:07.575 INFO 10820---[MessageThread_1]c.o.s.r.c.ManualCommitSyncCounsumer: 当前时间:2024-10-1910:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了

没有模拟除 0 异常,正常消费返回,一次就消费成功,结果符合预期。

模拟除 0 异常

  1. 2024-10-19 10:19:59.026 INFO 34052 ---[MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:19:59,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
  2. 2024-10-19 10:20:09.029 INFO 34052 ---[MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:20:09,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
  3. 2024-10-19 10:20:39.032 INFO 34052 ---[MessageThread_4] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:20:39,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

正常消费但是返回 NULL

  1. 2024-10-19 10:25:47.338 INFO 27256 ---[MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
  2. 2024-10-19 10:25:47.338 INFO 27256 ---[MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
  3. 2024-10-19 10:25:57.344 INFO 27256 ---[MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
  4. 2024-10-19 10:25:57.344 INFO 27256 ---[MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
  5. 2024-10-19 10:26:27.347 INFO 27256 ---[MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer : 当前时间:2024-10-19 10:26:27,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到返回 NULL 和模拟除 0 异常是一样的效果,消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

上面的案例我们没有控制消息消费重试次数,我们可以设置一个消息消费重试次数,代码如下:

  1. //设置最大消息重试次数
  2. consumer.setMaxReconsumeTimes(2);

使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式消费消息的时候消费失败也会自动进行重试,因此我们一定要控制好重试次数,可以 @RocketMQMessageListener 注解的 maxReconsumeTimes 来控制重试次数(高版本的 starter 才有该属性)。

不管使用哪种方式进行消费,对于达到重试次数还是消费失败的消息一般有两种处理方式,分別是:

  • 直接返回消息消费成功,使用本地库记录消息进行重新推送或者人工介入处理。
  • 不进行处理,让消息进入死信队列,然后去监听死信队列进行处理。

顺序消息 Producer 消息发送代码案例

顺序消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下顺序消息 Producer 消息发送代码,具体如下:

  1. packagecom.order.service.rocketmq.producer;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.client.producer.SendCallback;importorg.apache.rocketmq.client.producer.SendResult;importorg.apache.rocketmq.spring.core.RocketMQTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.messaging.support.MessageBuilder;importorg.springframework.stereotype.Component;/**
  2. * @ClassName: OneWayMessageProducer
  3. * @Author: Author
  4. * @Date: 2024/9/27 17:27
  5. * @Description: 顺序消息发送者
  6. */@Slf4j@ComponentpublicclassOrderlyMessageProducer{@AutowiredprivateRocketMQTemplate rocketMqTemplate;/**
  7. * @date 2024/10/11 15:45
  8. * @description 同步顺序消息
  9. */publicvoidsyncSendOrderly(){//hashKey 用来计算决定消息发送到哪个队列 一般是订单 ID 等信息 这里我们模拟订单 ID 发送
  10. rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(),"666666");
  11. rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(),"666666");
  12. rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(),"666666");
  13. rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(),"888888");
  14. rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(),"888888");
  15. rocketMqTemplate.syncSendOrderly("orderly-topic",MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(),"888888");}}

顺序消息 Consumer 消息消费代码案例

同步消息手动 ACK 我们是注册了一个 MessageListenerConcurrently 消息监听器,顺序消息的手动 ACK 我们需要注册一个 MessageListenerOrderly 的消息监听器,具体代码如下:

  1. packagecom.order.service.rocketmq.consumer;importlombok.extern.slf4j.Slf4j;importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.*;importorg.apache.rocketmq.client.exception.MQClientException;importorg.apache.rocketmq.common.message.MessageExt;importorg.apache.rocketmq.spring.annotation.ConsumeMode;importorg.apache.rocketmq.spring.annotation.RocketMQMessageListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.HashMap;importjava.util.List;importjava.util.Map;/**
  2. * @ClassName: ManualCommitOrderlyMessageConsumer
  3. * @Author: Author
  4. * @Date: 2024/10/10 17:35
  5. * @Description: 顺序消息消费
  6. */@Slf4j@ComponentpublicclassManualCommitOrderlyMessageConsumer{/**
  7. * @date 2024/10/16 17:19
  8. * @description 顺序消息消费成功 手动提交
  9. */@PostConstructpublicvoidonOrderlyMessage()throwsMQClientException{//消费者DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("orderly-group");//设置最大消息重试次数//consumer.setMaxReconsumeTimes(2);//RocketMQ 地址可以 可以用配置文件
  10. consumer.setNamesrvAddr("dev-ztn-rocketmq.eminxing.com:19876");//订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
  11. consumer.subscribe("orderly-topic","*");
  12. consumer.registerMessageListener(newMessageListenerOrderly(){//存储消息id 和消费次数的关系finalMap<String,Integer> map =newHashMap<>();@OverridepublicConsumeOrderlyStatusconsumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(newDate());//消息处理MessageExt messageExt = list.get(0);String message =newString(messageExt.getBody());String msgId = messageExt.getMsgId();//获取消息消费次数Integer count = map.get(msgId);if(count ==null){
  13. count =0;}//次数+1
  14. count = count +1;//覆盖map
  15. map.put(msgId, count);if(count >2){
  16. log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回顺序消费消费成功,消息内容:{}", dateStr, msgId, count, message);//消息消费成功后移除
  17. map.remove(msgId);returnConsumeOrderlyStatus.SUCCESS;}
  18. log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息内容:{}", dateStr, msgId, count, message);//模拟除 0 异常//int a = 1 / 0;
  19. log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息消费成功,消息内容:{}", dateStr, msgId, count, message);returnConsumeOrderlyStatus.SUCCESS;//return null;}});
  20. consumer.start();}}

RocketMQ 顺序消费端手动 ACK 结果验证:

正常消费返回 ConsumeOrderlyStatus.SUCCESS

  1. 2024-10-19 13:46:44.293 INFO 29348 ---[orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
  2. 2024-10-19 13:46:44.294 INFO 29348 ---[orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 创建
  3. 2024-10-19 13:46:44.295 INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
  4. 2024-10-19 13:46:44.295 INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 支付
  5. 2024-10-19 13:46:44.295 INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
  6. 2024-10-19 13:46:44.295 INFO 29348 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
  7. 2024-10-19 13:46:44.298 INFO 29348 ---[orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
  8. 2024-10-19 13:46:44.298 INFO 29348 ---[orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 创建
  9. 2024-10-19 13:46:44.300 INFO 29348 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
  10. 2024-10-19 13:46:44.300 INFO 29348 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 支付
  11. 2024-10-19 13:46:44.302 INFO 29348 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
  12. 2024-10-19 13:46:44.303 INFO 29348 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,结果符合预期。

模拟除 0 异常

  1. 2024-10-19 13:50:45.587 INFO 27604 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:45,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
  2. 2024-10-19 13:50:46.588 INFO 27604 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:46,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
  3. 2024-10-19 13:50:47.592 INFO 27604 ---[orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
  4. 2024-10-19 13:50:47.592 INFO 27604 ---[orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
  5. 2024-10-19 13:50:48.593 INFO 27604 ---[orderly-group_7] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:48,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
  6. 2024-10-19 13:50:49.595 INFO 27604 ---[orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
  7. 2024-10-19 13:50:49.595 INFO 27604 ---[orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
  8. 2024-10-19 13:50:50.610 INFO 27604 ---[orderly-group_9] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:50,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
  9. 2024-10-19 13:50:51.611 INFO 27604 ---[rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
  10. 2024-10-19 13:50:51.611 INFO 27604 ---[rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
  11. 2024-10-19 13:50:52.617 INFO 27604 ---[rderly-group_11] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:52,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
  12. 2024-10-19 13:50:53.618 INFO 27604 ---[rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
  13. 2024-10-19 13:50:53.618 INFO 27604 ---[rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
  14. 2024-10-19 13:50:54.620 INFO 27604 ---[rderly-group_13] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:54,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
  15. 2024-10-19 13:50:55.627 INFO 27604 ---[rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
  16. 2024-10-19 13:50:55.627 INFO 27604 ---[rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
  17. 2024-10-19 13:50:56.629 INFO 27604 ---[rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:56,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
  18. 2024-10-19 13:50:57.631 INFO 27604 ---[rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:57,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,结果符合预期。

这里需要注意一点的时候上一条消息没有消费成功时,后面一条消息永远不会被消费,这个从我们的日志中也能够体现出来,因此我们在使用顺序消息的时候一定要注意消息的重试次数,消息重试次数我们可以通过自己的业务来判断消费几次,也可以使用 RocketMQ 来实现,代码如下:

  1. //设置最大消息重试次数
  2. consumer.setMaxReconsumeTimes(2);

模拟返回 NULL

  1. 2024-10-19 14:00:51.484 INFO 22728 ---[rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:51,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
  2. 2024-10-19 14:00:52.485 INFO 22728 ---[rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:52,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
  3. 2024-10-19 14:00:53.492 INFO 22728 ---[rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
  4. 2024-10-19 14:00:53.492 INFO 22728 ---[rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
  5. 2024-10-19 14:00:54.492 INFO 22728 ---[rderly-group_17] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:54,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
  6. 2024-10-19 14:00:55.494 INFO 22728 ---[rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
  7. 2024-10-19 14:00:55.494 INFO 22728 ---[rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
  8. 2024-10-19 14:00:56.495 INFO 22728 ---[rderly-group_19] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:56,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
  9. 2024-10-19 14:00:57.497 INFO 22728 ---[rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
  10. 2024-10-19 14:00:57.497 INFO 22728 ---[rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
  11. 2024-10-19 14:00:58.510 INFO 22728 ---[orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:58,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
  12. 2024-10-19 14:00:59.520 INFO 22728 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
  13. 2024-10-19 14:00:59.520 INFO 22728 ---[orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
  14. 2024-10-19 14:01:00.521 INFO 22728 ---[orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:00,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
  15. 2024-10-19 14:01:01.523 INFO 22728 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
  16. 2024-10-19 14:01:01.523 INFO 22728 ---[orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
  17. 2024-10-19 14:01:02.525 INFO 22728 ---[orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:02,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
  18. 2024-10-19 14:01:03.527 INFO 22728 ---[orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:03,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,和模拟除 0异常 一样的结果,结果符合预期。

总结:RocketMQ 消息重试次数可以通过 @RocketMQMessageListener 注解的 maxReconsumeTimes 属性来设置,也可以通过编码来设置,不管采用何种方式来设置,我们都要在业务编码中做好处理,过多重视造成的性能问题,以及没有合理处理消费失败的消息造成的消息丢失问题,对于顺序消息我们更要慎重对待,顺序消息回到导致消息阻塞。

如有不正确的地方欢迎各位指出纠正。


本文转载自: https://blog.csdn.net/weixin_42118323/article/details/141649165
版权归原作者 码农爱java 所有, 如有侵权,请联系我们删除。

“RocketMQ 之消息消费手动提交 ACK 实战【案例分享】”的评论:

还没有评论