文章目录
Springboot项目中经常用到邮件推送、短信发送等自动推送服务。但是这类服务通常不稳定,当出现网络异常的时候,会导致邮件推送失败。
本篇文章将介绍在springboot项目整合rabbitmq来实现服务的高可靠,保证邮件、短信等服务的100%发送成功。
1,服务架构
上图为rabbitmq消息生产与消费的流程图。本文主要围绕这个流程图展开,利用 RabbitMQ 消息队列来实现邮件 100% 被投递,内容涵盖了 RabbitMQ 很多知识点,如:
- 生产者和消费者模型
- 消息发送确认机制
- 消费确认机制
- 消息的重新投递
- 消费幂等性, 等等
2,实现思路
- 创建一台 Linux 服务器,并安装 RabbitMQ(本文示例是在window上安装rmq)
- 开放 QQ 邮箱或者其它邮箱授权码,用于发送邮件
- 创建邮件发送项目并编写代码
- 发送邮件测试
- 消息发送失败处理
3、环境准备
3.1 安装配置rabbitmq
首先,安装和配置好rabbitmq,我使用的rabbitmq_server-3.8.14;
使用cmd 命令 rabbitmq-server 直接启动 rabbitmq 。
访问rabbitmq,端口号:15672
3.2 QQ邮箱授权码的获取
获取邮箱授权码的目的,主要是为了通过代码进行发送邮件,例如 QQ 邮箱授权码获取方式,如下图:
该授权码就是配置文件spring.mail.password需要的密码!
4、项目结构
- springboot版本:2.1.5.RELEASE
- RabbitMQ版本:3.8.14
- SendMailUtil:发送邮件工具类
- ProduceServiceImpl:生产者,发送消息
- ConsumerMailService:消费者,消费消息,发送邮件
5、代码实现
5.1、创建项目
在 IDEA 下创建一个名称为smail的 Springboot 项目,pom文件中加入amqp和mail。
项目pom.xml
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.2</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.tigerhhzz</groupId><artifactId>springboot-rabbitmq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-rabbitmq-demo</name><description>Rabbitmq Demo project for Spring Boot</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--mail 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.4</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
5.2 idea项目结构
5.3 数据库表sql
数据库:springbootrabbitmq_db_msg
表:msg_log
CREATE DATABASE IF NOT EXISTS springbootrabbitmq_db_msg default charset utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE TABLE `springbootrabbitmq_db_msg`.`springbootrabbitmq_db_msg_log`(`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
`exchange` varchar(100) NOT NULL DEFAULT '' COMMENT '交换机',
`route_key` varchar(100) NOT NULL DEFAULT '' COMMENT '路由键',
`queue_name` varchar(100) NOT NULL DEFAULT '' COMMENT '队列名称',
`msg` text COMMENT '消息体, json格式化',
`result` varchar(255) DEFAULT NULL COMMENT '处理结果',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0:等待消费,1:消费成功,2:消费失败,9:重试失败',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`msg_id`))ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息日志';
5.4、配置rabbitMQ、mail
在application.properties文件中,配置amqp和mail!
spring.application.name=springboot-rabbitmq-demo
server.port=8080
# 添加数据源配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/springbootrabbitmq_db_msg
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
# 配置mybatis的xml配置文件扫描目录
mybatis.mapper-locations=classpath:mapper/*.xml# 打印SQL语句,需要注射掉这个mybatis属性配置,否则启动报错
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
# mail# 配置邮件默认编码
spring.mail.default-encoding=UTF-8# 配置邮件发送主机地址
spring.mail.host=smtp.qq.com
# 配置邮件发送服务端口号#spring.mail.port=465# 配置邮件发送服务协议#spring.mail.protocol=smtp# 配置邮件发送者用户名或者账户
[email protected]
spring.mail.password=获取qq邮箱授权码
[email protected]
# 配置smtp相关属性
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
#rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.prefetch=100
其中,spring.mail.password第四步中获取的授权码,同时username和from要一致!
5.5、RabbitConfig配置类
packagecom.tigerhhzz.smail.config;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* <p>
* RabbitConfig
* </p>
*
* @author tigerhhzz
* @since 2024/8/12
*/@ConfigurationpublicclassRabbitConfig{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(RabbitConfig.class);@AutowiredprivateCachingConnectionFactory connectionFactory;@BeanpublicRabbitTemplaterabbitTemplate(){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 设置消息转换器为json格式
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){LOGGER.info("消息发送到Exchange成功,{}", correlationData);}else{LOGGER.error("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);}});// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{LOGGER.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);});return rabbitTemplate;}}
5.6、Mail 邮件实体类
packagecom.tigerhhzz.smail.entity;/**
* <p>
* Mail
* </p>
*
* @author tigerhhzz
* @since 2024/8/12
*/publicclassMail{/**
* 目标邮箱地址
*/privateStringto;/**
* 标题不能为空
*/privateString title;/**
* 正文不能为空
*/privateString content;/**
* 消息id
*/privateString msgId;publicStringgetTo(){returnto;}publicMailsetTo(Stringto){this.to=to;returnthis;}publicStringgetTitle(){return title;}publicMailsetTitle(String title){this.title = title;returnthis;}publicStringgetContent(){return content;}publicMailsetContent(String content){this.content = content;returnthis;}publicStringgetMsgId(){return msgId;}publicMailsetMsgId(String msgId){this.msgId = msgId;returnthis;}@OverridepublicStringtoString(){return"Mail{"+"to='"+to+'\''+", title='"+ title +'\''+", content='"+ content +'\''+", msgId='"+ msgId +'\''+'}';}}
5.7、SendMailService邮件发送类
packagecom.tigerhhzz.smail.service;importcom.tigerhhzz.smail.entity.Mail;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.mail.MailException;importorg.springframework.mail.SimpleMailMessage;importorg.springframework.mail.javamail.JavaMailSender;importorg.springframework.stereotype.Service;/**
* <p>
* SendMailService
* </p>
*
* @author tigerhhzz
* @since 2024/8/12
*/@ServicepublicclassSendMailService{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(SendMailService.class);@Value("${spring.mail.username}")privateString from;@AutowiredprivateJavaMailSender mailSender;/**
* 发送简单邮件
*
* @param mail
*/publicbooleansend(Mail mail){Stringto= mail.getTo();// 目标邮箱String title = mail.getTitle();// 邮件标题String content = mail.getContent();// 邮件正文SimpleMailMessage message =newSimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(title);
message.setText(content);try{
mailSender.send(message);LOGGER.info("邮件发送成功");returntrue;}catch(MailException e){LOGGER.error("邮件发送失败, to: {}, title: {}",to, title, e);returnfalse;}}}
5.8、ProduceService生产者类
packagecom.tigerhhzz.smail.service;importcom.tigerhhzz.smail.common.MessageHelper;importcom.tigerhhzz.smail.entity.Mail;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.util.UUID;/**
* <p>
* ProduceService
* </p>
*
* @author tigerhhzz
* @since 2024/8/12
*/@ServicepublicclassProduceService{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateMsgLogService msgLogService;/**
* 发送消息
* @param mail
* @return
*/publicbooleansendByAck(Mail mail){// 创建uuidString msgId =UUID.randomUUID().toString().replaceAll("-","");
mail.setMsgId(msgId);// 发送消息到mq服务器中(附带消息ID)CorrelationData correlationData =newCorrelationData(msgId);
rabbitTemplate.convertAndSend("mail.exchange","route.mail.ack",MessageHelper.objToMsg(mail), correlationData);returntrue;}/**
* 发送消息
* @param mail
* @return
*/publicbooleansendByAuto(Mail mail){String msgId =UUID.randomUUID().toString().replaceAll("-","");
mail.setMsgId(msgId);// 1.存储要消费的数据
msgLogService.save("mail.exchange","route.mail.auto","mq.mail.auto", msgId, mail);// 2.发送消息到mq服务器中(附带消息ID)CorrelationData correlationData =newCorrelationData(msgId);
rabbitTemplate.convertAndSend("mail.exchange","route.mail.auto",MessageHelper.objToMsg(mail), correlationData);returntrue;}}
5.9、ConsumerService 消费者类
packagecom.tigerhhzz.smail.mq;importcom.rabbitmq.client.Channel;importcom.tigerhhzz.smail.common.Constant;importcom.tigerhhzz.smail.common.MessageHelper;importcom.tigerhhzz.smail.entity.Mail;importcom.tigerhhzz.smail.entity.MsgLog;importcom.tigerhhzz.smail.service.MsgLogService;importcom.tigerhhzz.smail.service.SendMailService;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.Objects;/**
* <p>
* ConsumerService
* </p>
*
* @author tigerhhzz
* @since 2024/8/12
*/@ComponentpublicclassConsumerService{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(ConsumerService.class);@AutowiredprivateSendMailService sendMailService;@AutowiredprivateMsgLogService msgLogService;/**
* 监听消息队列,手动确认模式,必须手动调用ack或者nack方法
* 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=manual
* @param message
* @param channel
* @throws IOException
*/@RabbitListener(queues ={"mq.mail.ack"})publicvoidconsumeFromAck(Message message,Channel channel)throwsIOException{LOGGER.info("收到消息:{}", message.toString());//将消息转化为对象Mail mail =MessageHelper.msgToObj(message,Mail.class);// 手动确认模式long tag = message.getMessageProperties().getDeliveryTag();boolean success = sendMailService.send(mail);if(success){// 消费成功,消息会被删除
channel.basicAck(tag,false);}else{// 消费失败,重新返回队列
channel.basicNack(tag,false,true);}}/**
* 监听消息队列,自动确认模式,无需调用ack或者nack方法,当程序执行时才删除消息
* 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=auto
* @param message
*/@RabbitListener(queues ={"mq.mail.auto"})publicvoidconsumeFromAuto(Message message){LOGGER.info("收到消息:{}", message.toString());// 获取消息IDMail mail =MessageHelper.msgToObj(message,Mail.class);// 消息幂等性处理,如果已经处理成功,无需重复消费MsgLog queryObj = msgLogService.selectByMsgId(mail.getMsgId());if(Objects.nonNull(queryObj)&&Constant.SUCCESS.equals(queryObj.getStatus())){return;}// 发送邮件boolean success = sendMailService.send(mail);if(success){
msgLogService.updateStatus(mail.getMsgId(),Constant.SUCCESS,"邮件发送成功");}else{
msgLogService.updateStatus(mail.getMsgId(),Constant.FAIL,"邮件发送失败");}}}
5.10、TestController 控制层类
packagecom.tigerhhzz.smail.web;importcom.tigerhhzz.smail.entity.Mail;importcom.tigerhhzz.smail.service.ProduceService;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/test")publicclassTestController{@AutowiredprivateProduceService testService;@PostMapping("send")publicbooleansendMail(Mail mail){return testService.sendByAuto(mail);}}
6、测试服务
启动 SpringBoot 服务之后,用 postman 模拟请求接口。
查看控制台信息。
查看邮箱,已经收到一份邮件:
邮件发送成功!
7、消息发送失败处理
虽然,上面案例可以成功的实现消息的发送,但是上面的流程很脆弱,例如: rabbitMQ 突然蹦了、邮件发送失败了、重启 rabbitMQ 服务器出现消息重复消费,应该怎处理呢?
很显然,我们需要对原有的逻辑进行升级改造,因此我们需要引入数据库来记录消息的发送情况。
7.1、创建消息投递日志表
CREATE TABLE `msg_log`(`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
`msg` text COMMENT '消息体, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id`(`msg_id`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';
7.2、编写 MsgLog 相关服务类
packagecom.tigerhhzz.smail.dao;importcom.tigerhhzz.smail.entity.MsgLog;importorg.apache.ibatis.annotations.Param;importjava.util.List;publicinterfaceMsgLogMapper{/**
* 插入消息日志
* @param msgLog
*/voidinsert(MsgLog msgLog);/**
* 更新消息状态
* @param msgId
*/voidupdateStatus(@Param("msgId")String msgId,@Param("status")Integer status,@Param("result")String result);/**
* 查询需要重新投递的消息
* @return
*/List<MsgLog>selectFailMsg();/**
* 更新重试此时
* @param msgLog
*/voidupdateTryCount(MsgLog msgLog);/**
* 查询消息信息
* @param msgId
* @return
*/MsgLogselectByPrimaryKey(String msgId);}
packagecom.tigerhhzz.smail.service;importcom.tigerhhzz.smail.common.Constant;importcom.tigerhhzz.smail.common.JodaTimeUtil;importcom.tigerhhzz.smail.common.JsonUtil;importcom.tigerhhzz.smail.dao.MsgLogMapper;importcom.tigerhhzz.smail.entity.MsgLog;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjava.util.Date;importjava.util.List;/**
* <p>
* MsgLogService
* </p>
*
* @author tigerhhzz
* @since 2024/8/12
*/@ServicepublicclassMsgLogService{@AutowiredprivateMsgLogMapper msgLogMapper;/**
* 查询消费失败的消息
* @return
*/publicList<MsgLog>selectFailMsg(){return msgLogMapper.selectFailMsg();}/**
* 查询数据信息
* @param msgId
* @return
*/publicMsgLogselectByMsgId(String msgId){return msgLogMapper.selectByPrimaryKey(msgId);}/**
* 保存消息日志
* @param exchange
* @param routeKey
* @param queueName
* @param msgId
* @param object
* @return
*/@Transactionalpublicvoidsave(String exchange,String routeKey,String queueName,String msgId,Object object){MsgLog entity =buildMsgLog(exchange, routeKey, queueName, msgId, object);
msgLogMapper.insert(entity);}/**
* 更新状态
* @param msgId
* @param status
* @param result
*/@TransactionalpublicvoidupdateStatus(String msgId,Integer status,String result){
msgLogMapper.updateStatus(msgId, status, result);}/**
* 更新下次重试时间
* @param msgId
* @param currentTryCount
*/@TransactionalpublicvoidupdateNextTryTime(String msgId,Integer currentTryCount){MsgLog msgLog =newMsgLog();
msgLog.setMsgId(msgId);
msgLog.setNextTryTime(JodaTimeUtil.plusMinutes(newDate(), currentTryCount));
msgLogMapper.updateTryCount(msgLog);}privateMsgLogbuildMsgLog(String exchange,String routeKey,String queueName,String msgId,Object object){MsgLog target =newMsgLog();
target.setMsgId(msgId);
target.setExchange(exchange);
target.setRouteKey(routeKey);
target.setQueueName(queueName);
target.setMsg(JsonUtil.objToStr(object));
target.setStatus(Constant.WAIT);
target.setTryCount(0);
target.setNextTryTime(JodaTimeUtil.plusMinutes(newDate(),1));
target.setCreateTime(newDate());
target.setUpdateTime(newDate());return target;}}
7.3、改写服务逻辑
在生产服务类中,新增数据写入。
同时,在RabbitConfig服务配置,当消息发送成功之后,新增更新消息状态逻辑。
改造消费者ConsumerMailService,每次消费的时候,从数据库中查询,如果消息已经被消费,不用再重复发送数据!
这样即可保证,如果 rabbitMQ 服务器,即使重启之后重新推送消息,通过数据库判断,也不会重复消费进而发生业务异常!
7.4、利用定数任务对消息投递失败进行补偿
当 rabbitMQ 服务器突然挂掉之后,生成者就无法正常进行投递数据,此时因为消息已经被记录到数据库,因此我们可以利用定数任务查询出没有投递成功的消息,进行补偿投递。
利用定数任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功!
项目代码仓库地址:
https://gitee.com/spring2020/springboot-rabbitmq-demo
参考文档:
https://blog.csdn.net/unique_perfect/article/details/109380996
人生从来没有真正的绝境。只要一个人的心中还怀着一粒信念的种子,那么总有一天,他就能走出困境,让生命重新开花结果。
版权归原作者 hhzz 所有, 如有侵权,请联系我们删除。