👏👏👏
哈喽!大家好,我是【学无止境小奇】,一位热爱分享各种技术的博主!😍😍😍
⭐【学无止境小奇】的创作宗旨:每一条命令都亲自执行过,每一行代码都实际运行过,每一种方法都真实实践过,每一篇文章都良心制作过。✊✊✊
⭐【学无止境小奇】的博客中所有涉及命令、代码的地方,除了提供图片供大家参考,另外会在图片下方提供一份纯文本格式的命令或者代码方便大家粘贴复制直接执行命令或者运行代码。🤝🤝🤝
⭐如果你对技术有着浓厚的兴趣,欢迎关注【学无止境小奇】,欢迎大家和我一起交流。😘😘😘
❤️❤️❤️感谢各位朋友接下来的阅读❤️❤️❤️
文章目录
一、JAVA使用RabbitMQ解决生产端消息投递可靠性,消费端幂等性问题
1、生产端消息投递可靠性
1.1、消息落库
思路:
1.将消息落库:
我们发送一个消息没办法知道我们发的消息消费端是否接收到,假如消费端没有接收到那么我们需要触发补偿机制来重新发送一个消息,这个时候我们为了解决这个问题就需要将消息落库,每次将准备发送的消息存入到数据库中,并设置一个状态为待发送。
等消费端接收到消息并给我们反馈后,我们将数据库中的消息状态改为已完成。
消息库
发送消息之前先将消息落库
如果消息发送成功则将数据库状态改为发送完成,如果没有成功则将重试次数+1,我们一般重试3次还是失败就会将状态改为发送失败。
package com.xiaoqi.server.config;/**
* @ProjectName: yeb
* @Package: com.xiaoqi.server.config
* @ClassName: RabbitMQConfig
* @Author: LiShiQi
* @Description: ${description}
* @Date: 2022/2/24 16:16
* @Version: 1.0
*/
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.xiaoqi.server.pojo.MailConstants;
import com.xiaoqi.server.pojo.MailLog;
import com.xiaoqi.server.service.IMailLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/**
* @Description
* @Author LiShiQi
* @Date 2022/2/24 16:16
* @Version 1.0
*/
@Configuration
publicclassRabbitMQConfig{privatestatic final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
privateCachingConnectionFactory cachingConnectionFactory;
@Autowired
privateIMailLogService mailLogService;
@Bean
publicRabbitTemplaterabbitTemplate(){RabbitTemplate rabbitTemplate =newRabbitTemplate(cachingConnectionFactory);/**
* 消息确认回调,确认消息是否到达broker
* data:消息唯一标识
* ack:确认结果
* cause:失败原因
*/
rabbitTemplate.setConfirmCallback((data,ack,cause)->{String msgId = data.getId();if(ack){
LOGGER.info("{}=============>消息发送成功",msgId);
mailLogService.update(newUpdateWrapper<MailLog>().set("status",1).eq("msgId",msgId));}else{
LOGGER.error("{}=============>消息发送失败",msgId);}});/**
* 消息失败回调,比如router不到queue时回调
* msg:消息主题
* repCode:响应码
* repText:相应描述
* exchange;交换机
* routingkey:路由键
*/
rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{
LOGGER.error("{}=============>消息发送queue时失败",msg.getBody());});return rabbitTemplate;}
@Bean
publicQueuequeue(){returnnewQueue(MailConstants.MAIL_QUEUE_NAME);}
@Bean
publicDirectExchangedirectExchange(){returnnewDirectExchange(MailConstants.MAIL_EXCHANGE_NAME);}
@Bean
publicBindingbinding(){return BindingBuilder.bind(queue()).to(directExchange()).with(MailConstants.MAIL_ROUTING_KEY_NAME);}}
1.2、定时任务
前面我们消息已经落库了,这个时候我们就弄一个定时任务去扫描我们的消息表中,把状态为待发送的消息任务重新发送一次,如果还失败则重试次数字段+1,等重试次数到达3次,不再重试。
package com.xiaoqi.server.task;/**
* @ProjectName: yeb
* @Package: com.xiaoqi.server.task
* @ClassName: MailTask
* @Author: LiShiQi
* @Description: ${description}
* @Date: 2022/2/24 18:28
* @Version: 1.0
*/
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.xiaoqi.server.pojo.Employee;
import com.xiaoqi.server.pojo.MailConstants;
import com.xiaoqi.server.pojo.MailLog;
import com.xiaoqi.server.service.IEmployeeService;
import com.xiaoqi.server.service.IMailLogService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;/**
* @Description
* @Author LiShiQi
* @Date 2022/2/24 18:28
* @Version 1.0
*/
@Component
publicclassMailTask{
@Autowired
privateIMailLogService mailLogService;
@Autowired
privateIEmployeeService employeeService;
@Autowired
privateRabbitTemplate rabbitTemplate;
@Scheduled(cron ="0/10 * * * * ?")publicvoidmailTask(){List<MailLog> list = mailLogService.list(newQueryWrapper<MailLog>().eq("status",0).lt("tryTime", LocalDateTime.now()));
list.forEach(mailLog ->{//如果重试次数超过3次,更新状态为投递失败,不再重试if(3< mailLog.getCount()){
mailLogService.update(newUpdateWrapper<MailLog>().set("status",2).eq("msgId", mailLog.getMsgId()));}
mailLogService.update(newUpdateWrapper<MailLog>().set("count",mailLog.getCount()+1).set("updateTime", LocalDateTime.now()).set("tryTime", LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT)).eq("msgId",mailLog.getMsgId()));Employee emp = employeeService.getEmployee(mailLog.getEid()).get(0);//发送消息
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,
emp,newCorrelationData(mailLog.getMsgId()));});}}
2、消费端幂等性问题
首先幂等性问题就是一次和多次结果是一样的,也就是说有可能一个消息因为某些原因(例如在定时任务扫描数据库的时候扫描到状态为待发送,但是这个时候其实已经正在发送,这个时候定时任务又发送了一次)生产端可能给消费端发送了两次消息,这个时候我们消费端只需要消费一次就可以了,因为如果是电商业务,不可能下一笔订单扣两笔钱吧,所以这里我们用redis来实现。
2.1、redis解决
大概思路: 我们消费每一个消息的时候将这个消息的消息id放入redis中,如果接收到的消息id在redis中,证明我们已经消费过了就不在进行消费了。
package com.xiaoqi.mail;/**
* @ProjectName: yeb
* @Package: com.xiaoqi.mail
* @ClassName: MailReceiver
* @Author: LiShiQi
* @Description: ${description}
* @Date: 2022/2/24 12:43
* @Version: 1.0
*/
import com.rabbitmq.client.Channel;
import com.xiaoqi.server.pojo.Employee;
import com.xiaoqi.server.pojo.MailConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.mail.MailProperties;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.util.Date;/**
* @Description
* @Author LiShiQi
* @Date 2022/2/24 12:43
* @Version 1.0
*/
@Component
publicclassMailReceiver{privatestatic final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);//邮件发送
@Autowired
privateJavaMailSender javaMailSender;//邮件配置
@Autowired
privateMailProperties mailProperties;//引擎
@Autowired
privateTemplateEngine templateEngine;
@Autowired
privateRedisTemplate redisTemplate;//监听
@RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)publicvoidhandler(Message message,Channel channel){Employee employee =(Employee)message.getPayload();MessageHeaders headers = message.getHeaders();//消息序号long tag =(long) headers.get(AmqpHeaders.DELIVERY_TAG);String msgId =(String) headers.get("spring_returned_message_correlation");HashOperations hashOperations = redisTemplate.opsForHash();try{if(hashOperations.entries("mail_log").containsKey(msgId)){
LOGGER.error("消息已经被消费===============>{}",msgId);/**
* 手动确认消息
* tag:消息序号
* multiple:是否确认多条
*/
channel.basicAck(tag,false);return;}//创建消息MimeMessage msg = javaMailSender.createMimeMessage();MimeMessageHelper helper =newMimeMessageHelper(msg);//发件人
helper.setFrom(mailProperties.getUsername());//收件人
helper.setTo(employee.getEmail());//主题
helper.setSubject("入职欢迎邮件");//发送日期
helper.setSentDate(newDate());//邮件内容Context context =newContext();
context.setVariable("name",employee.getName());
context.setVariable("posName",employee.getPosition().getName());
context.setVariable("joblevelName",employee.getJoblevel().getName());
context.setVariable("departmentName",employee.getDepartment().getName());String mail = templateEngine.process("mail", context);//参数为true就是html
helper.setText(mail,true);//发送邮件
javaMailSender.send(msg);
LOGGER.info("邮件发送成功");//将消息id存入redis
hashOperations.put("mail_log",msgId,"OK");//手动确认消息
channel.basicAck(tag,false);}catch(Exception e){/**
* 手动确认消息
* tag:消息序号
* multiple:是否确认多条
* requeue:是否退回到队列
*/try{
channel.basicNack(tag,false,true);}catch(IOException e1){
LOGGER.error("邮件发送失败=========>{}",e.getMessage());}
LOGGER.error("邮件发送失败=========>{}",e.getMessage());}}}
3、总结
以上就是解决MQ消息队列的可靠性问题,因为在引入消息队列解决某些问题的同时我们随之而来了一些其他问题,这个时候我们就要考虑怎么解决这些其他问题,以上的解决方案只是众多方案中的其中一种,还有其他方案也可以解决这些问题。
版权归原作者 学无止境小奇 所有, 如有侵权,请联系我们删除。