大家好,我是工藤学编程 🦉一个正在努力学习的小博主,期待你的关注作业侠系列最新文章😉Java实现聊天程序SpringBoot实战系列🐷【SpringBoot实战系列】RabbitMQ实现消息发送并实现邮箱发送异常监控报警实战环境搭建大集合环境搭建大集合(持续更新)
在本栏中,我们之前已经完成了:
【SpringBoot实战系列】之发送短信验证码
【SpringBoot实战系列】之从Async组件应用实战到ThreadPoolTaskExecutor⾃定义线程池
【SpringBoot实战系列】之图形验证码开发并池化Redis6存储
【SpringBoot实战系列】阿里云OSS接入上传图片实战
【SpringBoot实战系列】Sharding-Jdbc实现分库分表到分布式ID生成器Snowflake自定义wrokId实战
RabbitMQ交换机类型
- 简介 ⽣产者将消息发送到 Exchange,交换器将消息路由到⼀个或者多个队列中,交换机有多个类型,队列和交换机是多对多的 关系。 交换机只负责转发消息,不具备存储消息的能⼒,如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失 RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的基本不⽤
- 交换机类型
- Direct Exchange 定向 将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配 例⼦:如果⼀个队列绑定到该交换机上要求路由键“aabb”,则只有被标记为“aabb”的消息才被转发,不会转发aabb.cc,也不会转发gg.aabb,只会转发aabb
- 处理路由健 Fanout Exchange ⼴播 只需要简单的将队列绑定到交换机上,⼀个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像⼦⽹⼴播,每台⼦⽹内的主机都获得了⼀份复制的消息 Fanout交换机转发消息是最快的,⽤于发布订阅,⼴播形式,中⽂是扇形 不处理路由健
- Topic Exchange 通配符 主题交换机是⼀种发布/订阅的模式,结合了直连交换机与扇形交换机的特点 将路由键和某模式进⾏匹配。此时队列需要绑定要⼀个模式上 符号“#”匹配⼀个或多个词,符号 ∗ * ∗匹配不多不少⼀个词 例⼦:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*”只会匹配到“abc.def”。
RabbitMQ配置开发实战
@Configuration@DatapublicclassRabbitMQConfig{/**
* 交换机
*/privateString shortLinkEventExchange ="short_link.event.exchange";/**
* 创建交换机 Topic类型
* ⼀般⼀个微服务⼀个交换机
*
* @return
*/@BeanpublicExchangeshortLinkEventExchange(){returnnewTopicExchange(shortLinkEventExchange,true,false);//return newFanoutExchange(shortLinkEventExchange,true,false);}/**
* 新增短链 队列
*/privateString shortLinkAddLinkQueue ="short_link.add.link.queue";/**
* 新增短链映射 队列
*/privateString shortLinkAddMappingQueue ="short_link.add.mapping.queue";/**
* 新增短链具体的routingKey,【发送消息使⽤】
*/privateString shortLinkAddRoutingKey ="short_link.add.link.mapping.routing.key";/**
* topic类型的binding key,⽤于绑定队列和交换机,是⽤
* 于 link 消费者
*/privateString shortLinkAddLinkBindingKey ="short_link.add.link.*.routing.key";/**
* topic类型的binding key,⽤于绑定队列和交换机,是⽤
* 于 mapping 消费者
*/privateString shortLinkAddMappingBindingKey ="short_link.add.*.mapping.routing.key";/**
* 新增短链api队列和交换机的绑定关系建⽴
*/@BeanpublicBindingshortLinkAddApiBinding(){returnnewBinding(shortLinkAddLinkQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange, shortLinkAddLinkBindingKey,null);}/**
* 新增短链mapping队列和交换机的绑定关系建⽴
*/@BeanpublicBindingshortLinkAddMappingBinding(){returnnewBinding(shortLinkAddMappingQueue,Binding.DestinationType.QUEUE, shortLinkEventExchange, shortLinkAddMappingBindingKey,null);}/**
* 新增短链api 普通队列,⽤于被监听
*/@BeanpublicQueueshortLinkAddLinkQueue(){returnnewQueue(shortLinkAddLinkQueue,true,false,false);}/**
* 新增短链mapping 普通队列,⽤于被监听
*/@BeanpublicQueueshortLinkAddMappingQueue(){returnnewQueue(shortLinkAddMappingQueue,true,false,false);}}
你要发送的消息,用
一个类封装起来即可
对应controller,service开发
@PostMapping("/add")publicJsonDatacreateShortLink(@RequestBodyShortLinkAddRequest shortLinkAddRequest){JsonData jsonData = shortLinkService.createShortLink(shortLinkAddRequest);return jsonData;}
对应service,注入配置好的RabbitConfig以及rabbitTemplate即可
@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateRabbitMQConfig rabbitMQConfig;
发送代码
@OverridepublicJsonDatacreateShortLink(ShortLinkAddRequest shortLinkAddRequest){/**
* 使用lombok建造者模式构建你要发送的消息,然后调用rabbitTemplate.convertAndSend方法,配置对应的交换机,路由key与消息即可
*/Long account_no =LoginInterceptor.threadLocal.get().getAccountNo();EventMessage eventMessage =EventMessage.builder().accountNo(account_no).messageId(IDUtil.geneSnowFlakeID().toString()).content(JsonUtil.obj2Json(shortLinkAddRequest)).eventMessageType(EventMessageType.SHORT_LINK_ADD.name()).build();
rabbitTemplate.convertAndSend(rabbitMQConfig.getShortLinkEventExchange(),rabbitMQConfig.getShortLinkAddRoutingKey(),eventMessage);returnJsonData.buildSuccess();}
消费者编写
@RabbitListener(queuesToDeclare ={@Queue("short_link.add.link.queue")})@Slf4j@ComponentpublicclassShortLinkAddLinkMQListener{@RabbitHandlerpublicvoidshortLinkHandler(EventMessage eventMessage,Message message,Channel channel)throwsException{
log.info("监听到消息 ShortLinkAddLinkMQListener:message消息内容:{}", message);try{int i=1/0;//TODO 处理业务}catch(Exception e){// 处理业务失败,还要进⾏其他操作,⽐如记录失败原因
log.error("消费失败{}", eventMessage);thrownewException(BizCodeEnum.MQ_CONSUME_EXCEPTION.getMessage());}
log.info("消费成功{}", eventMessage);//手动确认消息消费成功// channel.basicAck(msgTag, false);}}
application中配置rabbitMQ,部署可见环境搭建大集合(持续更新)
##----------rabbit配置--------------
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672#需要⼿⼯创建虚拟主机
spring.rabbitmq.virtual-host=dev
spring.rabbitmq.username=${admin}
spring.rabbitmq.password=${password}#消息确认⽅式,manual(⼿动ack) 和auto(⾃动ack)
spring.rabbitmq.listener.simple.acknowledgemode=auto
#开启重试,消费者代码不能添加try catch捕获不往外抛异常
spring.rabbitmq.listener.simple.retry.enabled=true
#最⼤重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=4# 重试消息的时间间隔,5秒
spring.rabbitmq.listener.simple.retry.initial-interval=5000
将配置中host,port,username,password改为你自己的即可,并且记得去RabbitMQ可视化控制台创建一个名为dev的host,具体操作如下:
注意:
加了@bean配置交换机和queue,启动项⽬却没⾃动化创建队列
RabbitMQ懒加载模式, 需要配置消费者监听才会创建,
@RabbitListener(queues =“short_link.add.link.queue”)
另外种⽅式(若Mq中⽆相应名称的队列,也会⾃动创建Queue)
@RabbitListener(queuesToDeclare = {@Queue(“short_link.add.link.queue”) })
因为我们消费者代码逻辑中有1/0,用于模拟业务过程出错,这样即实战了消息发送也实现了异常监控
异常监控队列配置
@Configuration@DatapublicclassRabbitMQErrorConfig{privateString shortLinkErrorExchange ="short_link.error.exchange";privateString shortLinkErrorQueue ="short_link.error.queue";privateString shortLinkErrorRoutingKey ="short_link.error.routing.key";@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* 异常交换机
*
* @return
*/@BeanpublicTopicExchangeerrorTopicExchange(){returnnewTopicExchange(shortLinkErrorExchange,true,false);}/**
* 异常队列
*
* @return
*/@BeanpublicQueueerrorQueue(){returnnewQueue(shortLinkErrorQueue,true);}/**
* 队列与交换机进⾏绑定
*
* @return
*/@BeanpublicBindingBindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){returnBindingBuilder.bind(errorQueue).to(errorTopicExchange).with(shortLinkErrorRoutingKey);}/**
* 配置 RepublishMessageRecoverer
* ⽤途:消息重试⼀定次数后,⽤特定的routingKey转发到指
* 定的交换机中,⽅便后续排查和告警
* <p>
* 顶层是 MessageRecoverer接⼝,多个实现类
*
* @return
*/@BeanpublicMessageRecoverermessageRecoverer(){returnnewRepublishMessageRecoverer(rabbitTemplate, shortLinkErrorExchange, shortLinkErrorRoutingKey);}}
对应消费者代码如下,实现邮箱发送,监控报警
@RabbitListener(queuesToDeclare ={@Queue("short_link.error.queue")})@Slf4j@ComponentpublicclassShortLinkErrorMQListener{publicstaticfinalString SUBJECT ="短链监控告警";publicstaticfinalString CONTENT ="用户%s短链创建%s,消息消费出现异常";@AutowiredprivateErrorNotifyComponent errorNotifyComponent;@RabbitHandlerpublicvoidshortLinkHandler(EventMessage eventMessage,Message message,Channel channel)throwsException{
log.info("监听到消息 ShortLinkAddLinkMQListener:message消息内容:{}", message);try{
errorNotifyComponent.sendMail("[email protected]",SUBJECT,String.format(CONTENT,eventMessage.getAccountNo(),eventMessage.getContent()));
log.info("发送成功");//TODO 处理业务}catch(Exception e){// 处理业务失败,还要进⾏其他操作,⽐如记录失败原因
log.error("消费失败{}", eventMessage);thrownewException(BizCodeEnum.MQ_CONSUME_EXCEPTION.getMessage());}
log.info("消费成功{}", eventMessage);//确认消息消费成功// channel.basicAck(msgTag, false);}}
ErrorNotifyComponent实现邮箱发送代码及配置,发送端使⽤⽹易邮箱https://mail.126.com/
具体如何获得授权码,请见博客网易邮箱获取授权码
application中添加配置如下
spring.mail.host=smtp.126.com
spring.mail.username=${你注册的网易邮箱账号}
spring.mail.password=${得到的授权码}
spring.mail.from=${你注册的网易邮箱账号}
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.ssl.enable=true
spring.mail.default-encoding=utf-8
邮箱组件代码
@Component@Slf4jpublicclassErrorNotifyComponent{@AutowiredprivateJavaMailSender mailSender;@Value("${spring.mail.from}")privateString from;@AsyncpublicvoidsendMail(Stringto,String subject,String content){SimpleMailMessage message =newSimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(subject);
message.setText(content);
mailSender.send(message);
log.info("邮件发送成功:{}",message.toString());}}
使用postman测试后,发现重试之后报了错,但是进入了异常队列并成功发送邮件
版权归原作者 工藤学编程 所有, 如有侵权,请联系我们删除。