一 : 在使用 RabbitMQ 作为消息队列时,保证消息被成功发送和消费是一个非常重要的问题。以下是一些关键点和最佳实践,以确保消息的可靠传输和处理。*
配置方式:
保证消息被成功发送
确认模式(ConfirmMode):
生产者可以启用确认模式,确保消息成功到达交换机。
使用 channel.confirmSelect() 启用确认模式。
使用 channel.waitForConfirms() 或 channel.addConfirmListener() 来处理确认消息。
事务模式(TransactionMode):
生产者可以使用事务模式,确保消息成功到达队列。
使用 channel.txSelect() 开启事务,channel.txCommit() 提交事务,channel.txRollback() 回滚事务。
处理发送失败:
实现重试机制,可以在发送失败时重试。
使用死信交换机(DeadLetterExchange,DLX)来存储处理失败的消息。
保证消息被成功消费
手动确认(ManualAcknowledgment):
消费者应该使用手动确认模式,确保消息被成功处理后再确认。
使用 channel.basicConsume(queue,false, consumer) 开启手动确认模式。
在消息处理成功后,调用 channel.basicAck(deliveryTag,false) 确认消息。
处理消费失败:
实现消费失败的重试机制。
使用死信交换机(DLX)来存储处理失败的消息。
幂等性:
确保消费者处理消息的幂等性,避免重复消费导致的问题。
二:通过记录消息到数据库中,采用定时任务轮询方式:
1 这是一个 Spring 组件,用于构建和发布返利消息事件。
//topic 字段从配置文件中获取,表示消息队列的 topic。//buildEventMessage 方法用于构建 EventMessage 对象,包含随机生成的 ID、当前时间戳和数据。//topic 方法返回消息队列的 topic。//RebateMessage 是一个内部类,定义了返利消息的结构@ComponentpublicclassSendRebateMessageEventextendsBaseEvent<SendRebateMessageEvent.RebateMessage>{@Value("${spring.rabbitmq.topic.send_rebate}")privateString topic;@OverridepublicEventMessage<RebateMessage>buildEventMessage(RebateMessage data){returnEventMessage.<SendRebateMessageEvent.RebateMessage>builder().id(RandomStringUtils.randomNumeric(11)).timestamp(newDate()).data(data).build();}@OverridepublicStringtopic(){return topic;}@Data@Builder@AllArgsConstructor@NoArgsConstructorpublicstaticclassRebateMessage{privateString userId;privateString rebateDesc;privateString rebateType;privateString rebateConfig;privateString bizId;}}@DatapublicabstractclassBaseEvent<T>{publicabstractEventMessage<T>buildEventMessage(T data);publicabstractStringtopic();@Data@Builder@AllArgsConstructor@NoArgsConstructorpublicstaticclassEventMessage<T>{privateString id;privateDate timestamp;privateT data;}}
2 生产者示例
//topic 字段从配置文件中获取,表示消息队列的 topic。//listener 方法使用 @RabbitListener 注解监听指定队列的消息。//消息到达后,解析消息内容,根据 rebateType 字段的不同,调用相应的服务方法处理消息。//异常处理机制确保了消息处理的健壮性。@ComponentpublicclassRebateMessageCustomer{@Value("${spring.rabbitmq.topic.send_rebate}")privateString topic;@ResourceprivateIRaffleActivityAccountQuotaService raffleActivityAccountQuotaService;@ResourceprivateICreditAdjustService creditAdjustService;@RabbitListener(queuesToDeclare =@Queue(value ="${spring.rabbitmq.topic.send_rebate}"))publicvoidlistener(String message){try{
log.info("监听用户行为返利消息 topic: {} message: {}", topic, message);BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> eventMessage =JSON.parseObject(message,newTypeReference<BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage>>(){}.getType());SendRebateMessageEvent.RebateMessage rebateMessage = eventMessage.getData();switch(rebateMessage.getRebateType()){case"sku":SkuRechargeEntity skuRechargeEntity =newSkuRechargeEntity();
skuRechargeEntity.setUserId(rebateMessage.getUserId());
skuRechargeEntity.setSku(Long.valueOf(rebateMessage.getRebateConfig()));
skuRechargeEntity.setOutBusinessNo(rebateMessage.getBizId());
skuRechargeEntity.setOrderTradeType(OrderTradeTypeVO.rebate_no_pay_trade);
raffleActivityAccountQuotaService.createOrder(skuRechargeEntity);break;case"integral":TradeEntity tradeEntity =newTradeEntity();
tradeEntity.setUserId(rebateMessage.getUserId());
tradeEntity.setTradeName(TradeNameVO.REBATE);
tradeEntity.setTradeType(TradeTypeVO.FORWARD);
tradeEntity.setAmount(newBigDecimal(rebateMessage.getRebateConfig()));
tradeEntity.setOutBusinessNo(rebateMessage.getBizId());
creditAdjustService.createOrder(tradeEntity);break;}}catch(AppException e){if(ResponseCode.INDEX_DUP.getCode().equals(e.getCode())){
log.warn("监听用户行为返利消息,消费重复 topic: {} message: {}", topic, message, e);return;}throw e;}catch(Exception e){
log.error("监听用户行为返利消息,消费失败 topic: {} message: {}", topic, message, e);throw e;}}}
3 消费者示例
//这个方法用于保存用户返利记录,并在事务中插入用户行为返利订单和任务对象。//在事务外,同步发送 MQ 消息。//发送消息时,调用 eventPublisher.publish 方法发布消息到指定的 topic。publicvoidsaveUserRebateRecord(String userId,List<BehaviorRebateAggregate> behaviorRebateAggregates){try{
dbRouter.doRouter(userId);
transactionTemplate.execute(status ->{try{for(BehaviorRebateAggregate behaviorRebateAggregate : behaviorRebateAggregates){BehaviorRebateOrderEntity behaviorRebateOrderEntity = behaviorRebateAggregate.getBehaviorRebateOrderEntity();UserBehaviorRebateOrder userBehaviorRebateOrder =newUserBehaviorRebateOrder();
userBehaviorRebateOrder.setUserId(behaviorRebateOrderEntity.getUserId());
userBehaviorRebateOrder.setOrderId(behaviorRebateOrderEntity.getOrderId());
userBehaviorRebateOrder.setBehaviorType(behaviorRebateOrderEntity.getBehaviorType());
userBehaviorRebateOrder.setRebateDesc(behaviorRebateOrderEntity.getRebateDesc());
userBehaviorRebateOrder.setRebateType(behaviorRebateOrderEntity.getRebateType());
userBehaviorRebateOrder.setRebateConfig(behaviorRebateOrderEntity.getRebateConfig());
userBehaviorRebateOrder.setOutBusinessNo(behaviorRebateOrderEntity.getOutBusinessNo());
userBehaviorRebateOrder.setBizId(behaviorRebateOrderEntity.getBizId());
userBehaviorRebateOrderDao.insert(userBehaviorRebateOrder);TaskEntity taskEntity = behaviorRebateAggregate.getTaskEntity();Task task =newTask();
task.setUserId(taskEntity.getUserId());
task.setTopic(taskEntity.getTopic());
task.setMessageId(taskEntity.getMessageId());
task.setMessage(JSON.toJSONString(taskEntity.getMessage()));
task.setState(taskEntity.getState().getCode());
taskDao.insert(task);}return1;}catch(DuplicateKeyException e){
status.setRollbackOnly();
log.error("写入返利记录,唯一索引冲突 userId: {}", userId, e);thrownewAppException(ResponseCode.INDEX_DUP.getCode(),ResponseCode.INDEX_DUP.getInfo());}});}finally{
dbRouter.clear();}for(BehaviorRebateAggregate behaviorRebateAggregate : behaviorRebateAggregates){TaskEntity taskEntity = behaviorRebateAggregate.getTaskEntity();Task task =newTask();
task.setUserId(taskEntity.getUserId());
task.setMessageId(taskEntity.getMessageId());try{
eventPublisher.publish(taskEntity.getTopic(), taskEntity.getMessage());
taskDao.updateTaskSendMessageCompleted(task);}catch(Exception e){
log.error("写入返利记录,发送MQ消息失败 userId: {} topic: {}", userId, task.getTopic());
taskDao.updateTaskSendMessageFail(task);}}}publicList<String>createOrder(BehaviorEntity behaviorEntity){// 1. 查询返利配置List<DailyBehaviorRebateVO> dailyBehaviorRebateVOS = behaviorRebateRepository.queryDailyBehaviorRebateConfig(behaviorEntity.getBehaviorTypeVO());if(null== dailyBehaviorRebateVOS || dailyBehaviorRebateVOS.isEmpty())returnnewArrayList<>();// 2. 构建聚合对象List<String> orderIds =newArrayList<>();List<BehaviorRebateAggregate> behaviorRebateAggregates =newArrayList<>();for(DailyBehaviorRebateVO dailyBehaviorRebateVO : dailyBehaviorRebateVOS){// 拼装业务ID;用户ID_返利类型_外部透彻业务IDString bizId = behaviorEntity.getUserId()+Constants.UNDERLINE+ dailyBehaviorRebateVO.getRebateType()+Constants.UNDERLINE+ behaviorEntity.getOutBusinessNo();BehaviorRebateOrderEntity behaviorRebateOrderEntity =BehaviorRebateOrderEntity.builder().userId(behaviorEntity.getUserId()).orderId(RandomStringUtils.randomNumeric(12)).behaviorType(dailyBehaviorRebateVO.getBehaviorType()).rebateDesc(dailyBehaviorRebateVO.getRebateDesc()).rebateType(dailyBehaviorRebateVO.getRebateType()).rebateConfig(dailyBehaviorRebateVO.getRebateConfig()).outBusinessNo(behaviorEntity.getOutBusinessNo()).bizId(bizId).build();
orderIds.add(behaviorRebateOrderEntity.getOrderId());// MQ 消息对象SendRebateMessageEvent.RebateMessage rebateMessage =SendRebateMessageEvent.RebateMessage.builder().userId(behaviorEntity.getUserId()).rebateType(dailyBehaviorRebateVO.getRebateType()).rebateConfig(dailyBehaviorRebateVO.getRebateConfig()).bizId(bizId).build();// 构建事件消息BaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> rebateMessageEventMessage = sendRebateMessageEvent.buildEventMessage(rebateMessage);// 组装任务对象TaskEntity taskEntity =newTaskEntity();
taskEntity.setUserId(behaviorEntity.getUserId());
taskEntity.setTopic(sendRebateMessageEvent.topic());
taskEntity.setMessageId(rebateMessageEventMessage.getId());
taskEntity.setMessage(rebateMessageEventMessage);
taskEntity.setState(TaskStateVO.create);BehaviorRebateAggregate behaviorRebateAggregate =BehaviorRebateAggregate.builder().userId(behaviorEntity.getUserId()).behaviorRebateOrderEntity(behaviorRebateOrderEntity).taskEntity(taskEntity).build();
behaviorRebateAggregates.add(behaviorRebateAggregate);}// 3. 存储聚合对象数据
behaviorRebateRepository.saveUserRebateRecord(behaviorEntity.getUserId(), behaviorRebateAggregates);// 4. 返回订单ID集合return orderIds;}@Data@Builder@AllArgsConstructor@NoArgsConstructorpublicclassTaskEntity{/** 活动ID */privateString userId;/** 消息主题 */privateString topic;/** 消息编号 */privateString messageId;/** 消息主体 */privateBaseEvent.EventMessage<SendRebateMessageEvent.RebateMessage> message;/** 任务状态;create-创建、completed-完成、fail-失败 */privateTaskStateVO state;}@DatapublicclassTask{/** 自增ID */privateString id;/** 活动ID */privateString userId;/** 消息主题 */privateString topic;/** 消息编号 */privateString messageId;/** 消息主体 */privateString message;/** 任务状态;create-创建、completed-完成、fail-失败 */privateString state;/** 创建时间 */privateDate createTime;/** 更新时间 */privateDate updateTime;}
4 定时任务示例
// @Scheduled(cron = "0/5 * * * * ?")publicvoidexec_db01(){try{// 设置库表
dbRouter.setDBKey(1);
dbRouter.setTBKey(0);// 查询未发送的任务List<TaskEntity> taskEntities = taskService.queryNoSendMessageTaskList();if(taskEntities.isEmpty())return;// 发送MQ消息for(TaskEntity taskEntity : taskEntities){try{
taskService.sendMessage(taskEntity);
taskService.updateTaskSendMessageCompleted(taskEntity.getUserId(), taskEntity.getMessageId());}catch(Exception e){
log.error("定时任务,发送MQ消息失败 userId: {} topic: {}", taskEntity.getUserId(), taskEntity.getTopic());
taskService.updateTaskSendMessageFail(taskEntity.getUserId(), taskEntity.getMessageId());}}}catch(Exception e){
log.error("定时任务,扫描MQ任务表发送消息失败。", e);}finally{
dbRouter.clear();}}@DatapublicclassTaskEntity{/** 活动ID */privateString userId;/** 消息主题 */privateString topic;/** 消息编号 */privateString messageId;/** 消息主体 */privateString message;}
三: 串联流程
生产消息:
saveUserRebateRecord 方法在事务中插入用户行为返利订单和任
务对象。
在事务外,调用 eventPublisher.publish 方法发布消息到指定的
topic。
消息构建和发布:
SendRebateMessageEvent 类构建 EventMessage 对象,包含随机
生成的 ID、当前时间戳和数据,并返回消息队列的 topic。
消费消息:
RebateMessageCustomer 类监听指定队列的消息,解析消息内容,
根据 rebateType 字段的不同,调用相应的服务方法处理消息。
定时任务补偿:
SendMessageTaskJob 类定时扫描数据库中的任务表,发送未发送的
消息到 MQ 队列,并更新任务状态。如果发送失败,记录错误日志并
更新任务状态为发送失败。
四:配置文件
spring:
rabbitmq:
addresses:****
port:***
username:**
password:**
listener:
simple:
prefetch:1 # 每次投递n个消息,消费完在投递n个
topic:
send_rebate: send_rebate
五: 消费失败
消息发送:
生产者在发送消息时,会将消息的相关信息(如消息内容、发送状态等)记录到 task 表中。
如果消息发送成功,则更新 task 表中的状态为“已发送”。
如果消息发送失败,则更新 task 表中的状态为“发送失败”。
定时任务会扫描 task 表,查找状态为“发送失败”的消息,并重试发送。
消息消费:
消费者在处理消息时,也会将消息的相关信息(如消息内容、处理状态等)记录到 task 表中。
如果消息处理成功,则更新 task 表中的状态为“已处理”。
如果消息处理失败,则更新 task 表中的状态为“处理失败”。
定时任务会扫描 task 表,查找状态为“处理失败”的消息,并重试处理。
通过这种方式,可以确保即使消息在发送或消费过程中出现失败,也能够通过重试机制最终成功发送或处理。
示例:
importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;@ServicepublicclassMessageService{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateTaskRepository taskRepository;@TransactionalpublicvoidsendMessage(String message){try{
rabbitTemplate.convertAndSend("send_rebate", message);TaskEntity task =newTaskEntity();
task.setMessage(message);
task.setStatus("SENT");
taskRepository.save(task);}catch(Exception e){TaskEntity task =newTaskEntity();
task.setMessage(message);
task.setStatus("SEND_FAILED");
taskRepository.save(task);}}}
importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageConsumer{@AutowiredprivateTaskRepository taskRepository;@RabbitListener(queues ="${spring.rabbitmq.topic.send_rebate}")publicvoidhandleMessage(String message){try{// 处理消息的逻辑System.out.println("Processing message: "+ message);// 假设处理成功TaskEntity task =newTaskEntity();
task.setMessage(message);
task.setStatus("PROCESSED");
taskRepository.save(task);}catch(Exception e){// 处理失败的逻辑TaskEntity task =newTaskEntity();
task.setMessage(message);
task.setStatus("PROCESS_FAILED");
taskRepository.save(task);}}}
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importjava.util.List;@ComponentpublicclassRetryTaskJob{@AutowiredprivateTaskRepository taskRepository;@AutowiredprivateMessageService messageService;@AutowiredprivateMessageConsumer messageConsumer;@Scheduled(fixedRate =60000)// 每分钟执行一次publicvoidretryFailedMessages(){// 重试发送失败的消息List<TaskEntity> sendFailedTasks = taskRepository.findByStatus("SEND_FAILED");for(TaskEntity task : sendFailedTasks){try{
messageService.sendMessage(task.getMessage());
task.setStatus("SENT");
taskRepository.save(task);}catch(Exception e){// 记录日志或进行其他处理}}// 重试处理失败的消息List<TaskEntity> processFailedTasks = taskRepository.findByStatus("PROCESS_FAILED");for(TaskEntity task : processFailedTasks){try{
messageConsumer.handleMessage(task.getMessage());
task.setStatus("PROCESSED");
taskRepository.save(task);}catch(Exception e){// 记录日志或进行其他处理}}}}
六: 防止重复消费
保证消息消费的幂等性是确保消息系统可靠性的重要一环。幂等性意
味着无论消息被处理一次还是多次,结果都是相同的。以下是一些常见的策略来保证消息消费的幂等性:
唯一标识符:为每条消息生成一个唯一的标识符(如 UUID),并在
处理消息时检查该标识符是否已经被处理过。如果已经处理过,则忽略该消息。
状态检查:在处理消息之前,检查系统的状态,确保该消息对应的操
作尚未执行。例如,如果消息是要更新某个资源,可以先检查该资源的状态,确保更新操作尚未执行。
数据库唯一约束:在数据库中为消息处理结果创建唯一约束,确保相
同的消息不会被重复处理。
幂等API设计:设计幂等的API,确保相同的请求多次执行不会产生不
同的结果。例如,使用“PUT”方法更新资源,而不是“POST”方法。
使用 Redis 来实现消息消费的幂等性是一个非常有效的方法。Redis 是一个高性能的内存数据库,适合用于存储临时状态信息。
1 消费者处理消息并记录到 Redis
importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.data.redis.core.StringRedisTemplate;importorg.springframework.stereotype.Component;importjava.util.concurrent.TimeUnit;@ComponentpublicclassMessageConsumer{@AutowiredprivateStringRedisTemplate redisTemplate;@RabbitListener(queues ="${spring.rabbitmq.topic.send_rebate}")publicvoidhandleMessage(String message){// 假设消息中包含唯一标识符String messageId =extractMessageId(message);// 检查消息是否已经处理过或正在处理if(redisTemplate.hasKey(messageId)){System.out.println("Message already processed or being processed: "+ messageId);return;}// 将消息ID存入Redis,设置过期时间
redisTemplate.opsForValue().set(messageId,"PROCESSING",60,TimeUnit.SECONDS);try{// 处理消息的逻辑System.out.println("Processing message: "+ message);// 假设处理成功
redisTemplate.opsForValue().set(messageId,"PROCESSED",60,TimeUnit.SECONDS);}catch(Exception e){// 处理失败的逻辑
redisTemplate.delete(messageId);// 删除Redis中的记录,以便可以重试}}privateStringextractMessageId(String message){// 假设消息中包含唯一标识符,例如 JSON 格式中的 "id" 字段// 这里只是一个示例,实际实现可能需要解析消息内容return message.substring(0,36);// 假设 UUID 长度为 36}}
spring:
redis:
host: localhost
port:6379
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
通过上述方法和配置,您可以确保消息消费的幂等性。消费者在处理消息之前会检查消息的唯一标识符是否已经存在于 Redis 中,如果存在,则忽略该消息,从而避免重复处理。同时,通过设置过期时间,可以确保在处理过程中出现异常时,Redis 中的记录会被删除,从而允许消息重试。
版权归原作者 @淡 定 所有, 如有侵权,请联系我们删除。