0


Springboot项目中利用 RabbitMQ 消息队列来实现邮件 100% 被投递

文章目录

Springboot项目中经常用到邮件推送、短信发送等自动推送服务。但是这类服务通常不稳定,当出现网络异常的时候,会导致邮件推送失败。

本篇文章将介绍在springboot项目整合rabbitmq来实现服务的高可靠,保证邮件、短信等服务的100%发送成功。

1,服务架构

在这里插入图片描述
上图为rabbitmq消息生产与消费的流程图。本文主要围绕这个流程图展开,利用 RabbitMQ 消息队列来实现邮件 100% 被投递,内容涵盖了 RabbitMQ 很多知识点,如:

  • 生产者和消费者模型
  • 消息发送确认机制
  • 消费确认机制
  • 消息的重新投递
  • 消费幂等性, 等等

2,实现思路

  1. 创建一台 Linux 服务器,并安装 RabbitMQ(本文示例是在window上安装rmq)
  2. 开放 QQ 邮箱或者其它邮箱授权码,用于发送邮件
  3. 创建邮件发送项目并编写代码
  4. 发送邮件测试
  5. 消息发送失败处理

3、环境准备

3.1 安装配置rabbitmq

首先,安装和配置好rabbitmq,我使用的rabbitmq_server-3.8.14;

在这里插入图片描述
使用cmd 命令 rabbitmq-server 直接启动 rabbitmq 。

在这里插入图片描述

访问rabbitmq,端口号:15672
在这里插入图片描述

3.2 QQ邮箱授权码的获取

获取邮箱授权码的目的,主要是为了通过代码进行发送邮件,例如 QQ 邮箱授权码获取方式,如下图:

在这里插入图片描述
该授权码就是配置文件spring.mail.password需要的密码!

4、项目结构

  • springboot版本:2.1.5.RELEASE
  • RabbitMQ版本:3.8.14
  • SendMailUtil:发送邮件工具类
  • ProduceServiceImpl:生产者,发送消息
  • ConsumerMailService:消费者,消费消息,发送邮件

5、代码实现

5.1、创建项目

在 IDEA 下创建一个名称为smail的 Springboot 项目,pom文件中加入amqp和mail。

在这里插入图片描述
项目pom.xml

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.2</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.tigerhhzz</groupId><artifactId>springboot-rabbitmq-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-rabbitmq-demo</name><description>Rabbitmq Demo project for Spring Boot</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><!--spring boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring boot 测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--开发环境调试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--mail 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!--amqp 支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- commons-lang3 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.4</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

5.2 idea项目结构

在这里插入图片描述

5.3 数据库表sql

数据库:springbootrabbitmq_db_msg
表:msg_log

CREATE DATABASE IF NOT EXISTS springbootrabbitmq_db_msg default charset utf8mb4 COLLATE utf8mb4_unicode_ci;

CREATE TABLE `springbootrabbitmq_db_msg`.`springbootrabbitmq_db_msg_log`(`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
  `exchange` varchar(100) NOT NULL DEFAULT '' COMMENT '交换机',
  `route_key` varchar(100) NOT NULL DEFAULT '' COMMENT '路由键',
  `queue_name` varchar(100) NOT NULL DEFAULT '' COMMENT '队列名称',
  `msg` text COMMENT '消息体, json格式化',
  `result` varchar(255) DEFAULT NULL COMMENT '处理结果',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0:等待消费,1:消费成功,2:消费失败,9:重试失败',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`msg_id`))ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息日志';

5.4、配置rabbitMQ、mail

在application.properties文件中,配置amqp和mail!

spring.application.name=springboot-rabbitmq-demo
server.port=8080

# 添加数据源配置
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/springbootrabbitmq_db_msg
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

# 配置mybatis的xml配置文件扫描目录
mybatis.mapper-locations=classpath:mapper/*.xml# 打印SQL语句,需要注射掉这个mybatis属性配置,否则启动报错
mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl

# mail# 配置邮件默认编码
spring.mail.default-encoding=UTF-8# 配置邮件发送主机地址
spring.mail.host=smtp.qq.com
# 配置邮件发送服务端口号#spring.mail.port=465# 配置邮件发送服务协议#spring.mail.protocol=smtp# 配置邮件发送者用户名或者账户
[email protected]
spring.mail.password=获取qq邮箱授权码
[email protected]
# 配置smtp相关属性
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

#rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.prefetch=100

其中,spring.mail.password第四步中获取的授权码,同时username和from要一致!

5.5、RabbitConfig配置类

packagecom.tigerhhzz.smail.config;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * <p>
 * RabbitConfig
 * </p>
 *
 * @author tigerhhzz
 * @since 2024/8/12
 */@ConfigurationpublicclassRabbitConfig{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(RabbitConfig.class);@AutowiredprivateCachingConnectionFactory connectionFactory;@BeanpublicRabbitTemplaterabbitTemplate(){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 设置消息转换器为json格式
        rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());// 消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){LOGGER.info("消息发送到Exchange成功,{}", correlationData);}else{LOGGER.error("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);}});// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{LOGGER.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);});return rabbitTemplate;}}

5.6、Mail 邮件实体类

packagecom.tigerhhzz.smail.entity;/**
 * <p>
 * Mail
 * </p>
 *
 * @author tigerhhzz
 * @since 2024/8/12
 */publicclassMail{/**
     * 目标邮箱地址
     */privateStringto;/**
     * 标题不能为空
     */privateString title;/**
     * 正文不能为空
     */privateString content;/**
     * 消息id
     */privateString msgId;publicStringgetTo(){returnto;}publicMailsetTo(Stringto){this.to=to;returnthis;}publicStringgetTitle(){return title;}publicMailsetTitle(String title){this.title = title;returnthis;}publicStringgetContent(){return content;}publicMailsetContent(String content){this.content = content;returnthis;}publicStringgetMsgId(){return msgId;}publicMailsetMsgId(String msgId){this.msgId = msgId;returnthis;}@OverridepublicStringtoString(){return"Mail{"+"to='"+to+'\''+", title='"+ title +'\''+", content='"+ content +'\''+", msgId='"+ msgId +'\''+'}';}}

5.7、SendMailService邮件发送类

packagecom.tigerhhzz.smail.service;importcom.tigerhhzz.smail.entity.Mail;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.mail.MailException;importorg.springframework.mail.SimpleMailMessage;importorg.springframework.mail.javamail.JavaMailSender;importorg.springframework.stereotype.Service;/**
 * <p>
 * SendMailService
 * </p>
 *
 * @author tigerhhzz
 * @since 2024/8/12
 */@ServicepublicclassSendMailService{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(SendMailService.class);@Value("${spring.mail.username}")privateString from;@AutowiredprivateJavaMailSender mailSender;/**
     * 发送简单邮件
     *
     * @param mail
     */publicbooleansend(Mail mail){Stringto= mail.getTo();// 目标邮箱String title = mail.getTitle();// 邮件标题String content = mail.getContent();// 邮件正文SimpleMailMessage message =newSimpleMailMessage();
        message.setFrom(from);
        message.setTo(to);
        message.setSubject(title);
        message.setText(content);try{
            mailSender.send(message);LOGGER.info("邮件发送成功");returntrue;}catch(MailException e){LOGGER.error("邮件发送失败, to: {}, title: {}",to, title, e);returnfalse;}}}

5.8、ProduceService生产者类

packagecom.tigerhhzz.smail.service;importcom.tigerhhzz.smail.common.MessageHelper;importcom.tigerhhzz.smail.entity.Mail;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjava.util.UUID;/**
 * <p>
 * ProduceService
 * </p>
 *
 * @author tigerhhzz
 * @since 2024/8/12
 */@ServicepublicclassProduceService{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateMsgLogService msgLogService;/**
     * 发送消息
     * @param mail
     * @return
     */publicbooleansendByAck(Mail mail){// 创建uuidString msgId =UUID.randomUUID().toString().replaceAll("-","");
        mail.setMsgId(msgId);// 发送消息到mq服务器中(附带消息ID)CorrelationData correlationData =newCorrelationData(msgId);
        rabbitTemplate.convertAndSend("mail.exchange","route.mail.ack",MessageHelper.objToMsg(mail), correlationData);returntrue;}/**
     * 发送消息
     * @param mail
     * @return
     */publicbooleansendByAuto(Mail mail){String msgId =UUID.randomUUID().toString().replaceAll("-","");
        mail.setMsgId(msgId);// 1.存储要消费的数据
        msgLogService.save("mail.exchange","route.mail.auto","mq.mail.auto", msgId, mail);// 2.发送消息到mq服务器中(附带消息ID)CorrelationData correlationData =newCorrelationData(msgId);
        rabbitTemplate.convertAndSend("mail.exchange","route.mail.auto",MessageHelper.objToMsg(mail), correlationData);returntrue;}}

5.9、ConsumerService 消费者类

packagecom.tigerhhzz.smail.mq;importcom.rabbitmq.client.Channel;importcom.tigerhhzz.smail.common.Constant;importcom.tigerhhzz.smail.common.MessageHelper;importcom.tigerhhzz.smail.entity.Mail;importcom.tigerhhzz.smail.entity.MsgLog;importcom.tigerhhzz.smail.service.MsgLogService;importcom.tigerhhzz.smail.service.SendMailService;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.Objects;/**
 * <p>
 * ConsumerService
 * </p>
 *
 * @author tigerhhzz
 * @since 2024/8/12
 */@ComponentpublicclassConsumerService{privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(ConsumerService.class);@AutowiredprivateSendMailService sendMailService;@AutowiredprivateMsgLogService msgLogService;/**
     * 监听消息队列,手动确认模式,必须手动调用ack或者nack方法
     * 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=manual
     * @param message
     * @param channel
     * @throws IOException
     */@RabbitListener(queues ={"mq.mail.ack"})publicvoidconsumeFromAck(Message message,Channel channel)throwsIOException{LOGGER.info("收到消息:{}", message.toString());//将消息转化为对象Mail mail =MessageHelper.msgToObj(message,Mail.class);// 手动确认模式long tag = message.getMessageProperties().getDeliveryTag();boolean success = sendMailService.send(mail);if(success){// 消费成功,消息会被删除
            channel.basicAck(tag,false);}else{// 消费失败,重新返回队列
            channel.basicNack(tag,false,true);}}/**
     * 监听消息队列,自动确认模式,无需调用ack或者nack方法,当程序执行时才删除消息
     * 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=auto
     * @param message
     */@RabbitListener(queues ={"mq.mail.auto"})publicvoidconsumeFromAuto(Message message){LOGGER.info("收到消息:{}", message.toString());// 获取消息IDMail mail =MessageHelper.msgToObj(message,Mail.class);// 消息幂等性处理,如果已经处理成功,无需重复消费MsgLog queryObj = msgLogService.selectByMsgId(mail.getMsgId());if(Objects.nonNull(queryObj)&&Constant.SUCCESS.equals(queryObj.getStatus())){return;}// 发送邮件boolean success = sendMailService.send(mail);if(success){
            msgLogService.updateStatus(mail.getMsgId(),Constant.SUCCESS,"邮件发送成功");}else{
            msgLogService.updateStatus(mail.getMsgId(),Constant.FAIL,"邮件发送失败");}}}

5.10、TestController 控制层类

packagecom.tigerhhzz.smail.web;importcom.tigerhhzz.smail.entity.Mail;importcom.tigerhhzz.smail.service.ProduceService;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.PostMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/test")publicclassTestController{@AutowiredprivateProduceService testService;@PostMapping("send")publicbooleansendMail(Mail mail){return testService.sendByAuto(mail);}}

6、测试服务

启动 SpringBoot 服务之后,用 postman 模拟请求接口。

在这里插入图片描述

查看控制台信息。

在这里插入图片描述
查看邮箱,已经收到一份邮件:
在这里插入图片描述
邮件发送成功!

7、消息发送失败处理

虽然,上面案例可以成功的实现消息的发送,但是上面的流程很脆弱,例如: rabbitMQ 突然蹦了、邮件发送失败了、重启 rabbitMQ 服务器出现消息重复消费,应该怎处理呢?

很显然,我们需要对原有的逻辑进行升级改造,因此我们需要引入数据库来记录消息的发送情况。

7.1、创建消息投递日志表

CREATE TABLE `msg_log`(`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
  `msg` text COMMENT '消息体, json格式化',
  `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
  `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`msg_id`),
  UNIQUE KEY `unq_msg_id`(`msg_id`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

7.2、编写 MsgLog 相关服务类

packagecom.tigerhhzz.smail.dao;importcom.tigerhhzz.smail.entity.MsgLog;importorg.apache.ibatis.annotations.Param;importjava.util.List;publicinterfaceMsgLogMapper{/**
     * 插入消息日志
     * @param msgLog
     */voidinsert(MsgLog msgLog);/**
     * 更新消息状态
     * @param msgId
     */voidupdateStatus(@Param("msgId")String msgId,@Param("status")Integer status,@Param("result")String result);/**
     * 查询需要重新投递的消息
     * @return
     */List<MsgLog>selectFailMsg();/**
     * 更新重试此时
     * @param msgLog
     */voidupdateTryCount(MsgLog msgLog);/**
     * 查询消息信息
     * @param msgId
     * @return
     */MsgLogselectByPrimaryKey(String msgId);}
packagecom.tigerhhzz.smail.service;importcom.tigerhhzz.smail.common.Constant;importcom.tigerhhzz.smail.common.JodaTimeUtil;importcom.tigerhhzz.smail.common.JsonUtil;importcom.tigerhhzz.smail.dao.MsgLogMapper;importcom.tigerhhzz.smail.entity.MsgLog;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importjava.util.Date;importjava.util.List;/**
 * <p>
 * MsgLogService
 * </p>
 *
 * @author tigerhhzz
 * @since 2024/8/12
 */@ServicepublicclassMsgLogService{@AutowiredprivateMsgLogMapper msgLogMapper;/**
     * 查询消费失败的消息
     * @return
     */publicList<MsgLog>selectFailMsg(){return msgLogMapper.selectFailMsg();}/**
     * 查询数据信息
     * @param msgId
     * @return
     */publicMsgLogselectByMsgId(String msgId){return msgLogMapper.selectByPrimaryKey(msgId);}/**
     * 保存消息日志
     * @param exchange
     * @param routeKey
     * @param queueName
     * @param msgId
     * @param object
     * @return
     */@Transactionalpublicvoidsave(String exchange,String routeKey,String queueName,String msgId,Object object){MsgLog entity =buildMsgLog(exchange, routeKey, queueName, msgId, object);
        msgLogMapper.insert(entity);}/**
     * 更新状态
     * @param msgId
     * @param status
     * @param result
     */@TransactionalpublicvoidupdateStatus(String msgId,Integer status,String result){
        msgLogMapper.updateStatus(msgId, status, result);}/**
     * 更新下次重试时间
     * @param msgId
     * @param currentTryCount
     */@TransactionalpublicvoidupdateNextTryTime(String msgId,Integer currentTryCount){MsgLog msgLog =newMsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setNextTryTime(JodaTimeUtil.plusMinutes(newDate(), currentTryCount));
        msgLogMapper.updateTryCount(msgLog);}privateMsgLogbuildMsgLog(String exchange,String routeKey,String queueName,String msgId,Object object){MsgLog target =newMsgLog();
        target.setMsgId(msgId);
        target.setExchange(exchange);
        target.setRouteKey(routeKey);
        target.setQueueName(queueName);
        target.setMsg(JsonUtil.objToStr(object));
        target.setStatus(Constant.WAIT);
        target.setTryCount(0);
        target.setNextTryTime(JodaTimeUtil.plusMinutes(newDate(),1));
        target.setCreateTime(newDate());
        target.setUpdateTime(newDate());return target;}}

7.3、改写服务逻辑

在生产服务类中,新增数据写入。
在这里插入图片描述
同时,在RabbitConfig服务配置,当消息发送成功之后,新增更新消息状态逻辑。
在这里插入图片描述

改造消费者ConsumerMailService,每次消费的时候,从数据库中查询,如果消息已经被消费,不用再重复发送数据!

在这里插入图片描述
这样即可保证,如果 rabbitMQ 服务器,即使重启之后重新推送消息,通过数据库判断,也不会重复消费进而发生业务异常!

7.4、利用定数任务对消息投递失败进行补偿

当 rabbitMQ 服务器突然挂掉之后,生成者就无法正常进行投递数据,此时因为消息已经被记录到数据库,因此我们可以利用定数任务查询出没有投递成功的消息,进行补偿投递。
在这里插入图片描述
利用定数任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功!

项目代码仓库地址:
https://gitee.com/spring2020/springboot-rabbitmq-demo

参考文档:
https://blog.csdn.net/unique_perfect/article/details/109380996

在这里插入图片描述


人生从来没有真正的绝境。只要一个人的心中还怀着一粒信念的种子,那么总有一天,他就能走出困境,让生命重新开花结果。



本文转载自: https://blog.csdn.net/weixin_43025151/article/details/141124869
版权归原作者 hhzz 所有, 如有侵权,请联系我们删除。

“Springboot项目中利用 RabbitMQ 消息队列来实现邮件 100% 被投递”的评论:

还没有评论