** RabbitMq在我们日常开发中不可或缺,作为主流消息中间件,可以用于项目中的应用解耦、流量削峰、异步处理(非主流任务交由队列下发处理)等,本文着重介绍运用于项目中流量峰值时,依据服务器的消费能力进行削峰,最大限度保障服务器不宕机。**
前期准备:安装rabbitMq、新建一个springboot项目
略…
第一步:pom文件中导入amqp依赖
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
第二步:yml中配置
此处生产者与消费者放到一个项目中,可以依据项目的需求调整生产者和消费者进行拆分。
除了通用配置外,还有两点说明:
1.生产者消息确认配置项,确认消息发送到交换机和队列
2.消费者配置手动确认配置项,默认消息是自动确认的,正常业务都需要手动确认(不手动确认,消息一直在)
生产者与消费者的详细完整配置如下:
spring:#配置rabbitMQrabbitmq:host: 192.168.144.133 #rabbitmq服务地址port:5672username: admin
password:123456#消费者 手动确认配置项listener:type: simple
simple:acknowledge-mode: MANUAL #消息确认方式 MANUAL手动确认 NONE不确认 AUTO自动确认retry:enabled:true#开启重试max-attempts:3#最大重试次数initial-interval: 5000ms #重试间隔时间#生产者 消息确认配置项#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)publisher-returns:true
第三步:生产者配置
** 配置主题型交换机、队列(交换机是消息队列传输的载体),并且将队列和交换机绑定,并且设置绑定路由键**
/**
* 主题型交换机、队列配置
*/@ConfigurationpublicclassTopicRabbitConfig{@BeanpublicQueuegeoQueue(){returnnewQueue("geo.inout.queue");}@BeanpublicTopicExchangegeoExchange(){returnnewTopicExchange("geo-exchange");}/**
* 将队列和交换机绑定,而且绑定的键值为geo.key.inout
* 这样只要是消息携带的路由键是geo.key.inout,才会分发到该队列
*/@BeanBindingbindingGeoExchange(){returnBindingBuilder.bind(geoQueue()).to(geoExchange()).with("geo.key.inout");}}
** 消息确认回调函数配置(确认消息正常发送到RabbitMq上)**
/**
* 消息确认回调函数配置
*/@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
log.info("ConfirmCallback: "+"相关数据:"+correlationData);
log.info("ConfirmCallback: "+"确认情况:"+ack);
log.info("ConfirmCallback: "+"原因:"+cause);}});
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
log.info("ReturnCallback: "+"消息:"+returnedMessage.getMessage());
log.info("ReturnCallback: "+"回应码:"+returnedMessage.getReplyCode());
log.info("ReturnCallback: "+"回应信息:"+returnedMessage.getReplyText());
log.info("ReturnCallback: "+"交换机:"+returnedMessage.getExchange());
log.info("ReturnCallback: "+"路由键:"+returnedMessage.getRoutingKey());}});return rabbitTemplate;}}
** 发送实体JSON序列化配置(防止消息乱码)**
/**
* 发送实体JSON序列化配置
*/@ConfigurationpublicclassRabbitProviderConfigimplementsInitializingBean{@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicvoidafterPropertiesSet()throwsException{//使用JSON序列化
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());}}
第四步:消费者配置
消费者消息接收监听类,说明:
1.调用channel.basicAck()方法为消费端执行消息手动确认,即消息被消费
2.因为RabbitMq要保持有序性,只有前面消费完了,后面才能消费,有可能出现消息消费慢的问题(接口反应处理慢)。这就需要在消费端开启多线程监听队列,
具体设置concurrency为开启多线程监听队列,concurrency = “5-8”:表示开启5个线程监听队列,最大为8个线程
/**
* 主题型-消息接收监听类
*/@Slf4j@ComponentpublicclassGeoMQReceiver{/**
* 默认是单线程监听队列,消息消费会慢
* 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程
*/@RabbitHandler@RabbitListener(queues ="geo.inout.queue", concurrency ="5-8")publicvoidprocess(Map<String,Object> map,Message message,Channel channel)throwsIOException{try{// TODO 你的业务处理// 手动确认消息// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){// 确认失败 将消息重新放回队列,让别人消费// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
接收实体JSON序列化配置(防止消息乱码)
/**
* 接收实体JSON序列化配置
*/@ConfigurationpublicclassRabbitMQConfig{@BeanpublicMessageConverterjsonMessageConverter(ObjectMapper objectMapper){returnnewJackson2JsonMessageConverter(objectMapper);}}
第五步:编写测试进行测试
** 提供两个方法进行模拟,方法一为正常调用接口,方法二为通过消息中间件调用接口**
@RestController@RequestMapping("/geo")publicclassGeoController{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* 正常调用接口
*/@PostMapping("/test1")publicvoidtest1(@RequestBodyMap<String,Object> params){business(params);}/**
* 通过消息中间件调用接口
*/@PostMapping("/test2")publicvoidtest2(@RequestBodyMap<String,Object> params){
rabbitTemplate.convertAndSend("geo-exchange","geo.key.inout", params);}/**
* 业务处理方法
*/publicvoidbusiness(Map<String,Object> params){//你的实际业务代码块try{String id =(String)params.get("id");
log.info("获取到的参数:"+id);Thread.sleep(200);}catch(InterruptedException e){
e.printStackTrace();}}}
** 消息接收监听类中处理业务方法**
/**
* 主题型-消息接收监听类
*/@Slf4j@ComponentpublicclassGeoMQReceiver{@AutowiredprivateGeoController geoController;/**
* 默认是单线程监听队列,消息消费会慢
* 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程
*/@RabbitHandler@RabbitListener(queues ="geo.nb.inout.queue", concurrency ="5-8")publicvoidprocess(Map<String,Object> map,Message message,Channel channel)throwsIOException{try{
geoController.business(map);// 手动确认消息// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){// 确认失败 将消息重新放回队列,让别人消费// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
第六步:使用JMeter进行压力测试
略…
** 总结:当项目并发压力上来的时候,可以通过增加服务器性能、服务集群、数据缓存批量处理、数据库分库等多种方案进行提升服务并发性能,基于消息中间件削峰只是其中的一个技术方案,可以依据自己的项目需求选择合理的并发削峰方案。建议做成配置性选择走常规路由或消息中间件,后期只需要根据业务数据请求量在配置中心改配置就可以实现了,保障服务不会宕机。**
版权归原作者 Jon Young 所有, 如有侵权,请联系我们删除。