0


基于RabbitMq的削峰实例

**  RabbitMq在我们日常开发中不可或缺,作为主流消息中间件,可以用于项目中的应用解耦、流量削峰、异步处理(非主流任务交由队列下发处理)等,本文着重介绍运用于项目中流量峰值时,依据服务器的消费能力进行削峰,最大限度保障服务器不宕机。**


前期准备:安装rabbitMq、新建一个springboot项目

  略…

第一步:pom文件中导入amqp依赖

<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步:yml中配置

此处生产者与消费者放到一个项目中,可以依据项目的需求调整生产者和消费者进行拆分。
除了通用配置外,还有两点说明:
  1.生产者消息确认配置项,确认消息发送到交换机和队列
  2.消费者配置手动确认配置项,默认消息是自动确认的,正常业务都需要手动确认(不手动确认,消息一直在)
生产者与消费者的详细完整配置如下:

spring:#配置rabbitMQrabbitmq:host: 192.168.144.133  #rabbitmq服务地址port:5672username: admin
    password:123456#消费者 手动确认配置项listener:type: simple
      simple:acknowledge-mode: MANUAL #消息确认方式 MANUAL手动确认 NONE不确认 AUTO自动确认retry:enabled:true#开启重试max-attempts:3#最大重试次数initial-interval: 5000ms #重试间隔时间#生产者 消息确认配置项#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)publisher-returns:true

第三步:生产者配置

**  配置主题型交换机、队列(交换机是消息队列传输的载体),并且将队列和交换机绑定,并且设置绑定路由键**

/**
 * 主题型交换机、队列配置
 */@ConfigurationpublicclassTopicRabbitConfig{@BeanpublicQueuegeoQueue(){returnnewQueue("geo.inout.queue");}@BeanpublicTopicExchangegeoExchange(){returnnewTopicExchange("geo-exchange");}/**
     * 将队列和交换机绑定,而且绑定的键值为geo.key.inout
     * 这样只要是消息携带的路由键是geo.key.inout,才会分发到该队列
     */@BeanBindingbindingGeoExchange(){returnBindingBuilder.bind(geoQueue()).to(geoExchange()).with("geo.key.inout");}}

**  消息确认回调函数配置(确认消息正常发送到RabbitMq上)**

/**
 * 消息确认回调函数配置
 */@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
                log.info("ConfirmCallback:     "+"相关数据:"+correlationData);
                log.info("ConfirmCallback:     "+"确认情况:"+ack);
                log.info("ConfirmCallback:     "+"原因:"+cause);}});

        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
                log.info("ReturnCallback:     "+"消息:"+returnedMessage.getMessage());
                log.info("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());
                log.info("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());
                log.info("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());
                log.info("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());}});return rabbitTemplate;}}

**  发送实体JSON序列化配置(防止消息乱码)**

/**
 * 发送实体JSON序列化配置
 */@ConfigurationpublicclassRabbitProviderConfigimplementsInitializingBean{@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicvoidafterPropertiesSet()throwsException{//使用JSON序列化
        rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());}}

第四步:消费者配置

消费者消息接收监听类,说明:
  1.调用channel.basicAck()方法为消费端执行消息手动确认,即消息被消费
  2.因为RabbitMq要保持有序性,只有前面消费完了,后面才能消费,有可能出现消息消费慢的问题(接口反应处理慢)。这就需要在消费端开启多线程监听队列,
具体设置concurrency为开启多线程监听队列,concurrency = “5-8”:表示开启5个线程监听队列,最大为8个线程

/**
 * 主题型-消息接收监听类
 */@Slf4j@ComponentpublicclassGeoMQReceiver{/**
     * 默认是单线程监听队列,消息消费会慢
     * 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程
     */@RabbitHandler@RabbitListener(queues ="geo.inout.queue", concurrency ="5-8")publicvoidprocess(Map<String,Object> map,Message message,Channel channel)throwsIOException{try{// TODO 你的业务处理// 手动确认消息// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){// 确认失败 将消息重新放回队列,让别人消费// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

接收实体JSON序列化配置(防止消息乱码)

/**
 * 接收实体JSON序列化配置
 */@ConfigurationpublicclassRabbitMQConfig{@BeanpublicMessageConverterjsonMessageConverter(ObjectMapper objectMapper){returnnewJackson2JsonMessageConverter(objectMapper);}}

第五步:编写测试进行测试

**  提供两个方法进行模拟,方法一为正常调用接口,方法二为通过消息中间件调用接口**

@RestController@RequestMapping("/geo")publicclassGeoController{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
     * 正常调用接口
     */@PostMapping("/test1")publicvoidtest1(@RequestBodyMap<String,Object> params){business(params);}/**
     * 通过消息中间件调用接口
     */@PostMapping("/test2")publicvoidtest2(@RequestBodyMap<String,Object> params){
        rabbitTemplate.convertAndSend("geo-exchange","geo.key.inout", params);}/**
     * 业务处理方法
     */publicvoidbusiness(Map<String,Object> params){//你的实际业务代码块try{String id =(String)params.get("id");
            log.info("获取到的参数:"+id);Thread.sleep(200);}catch(InterruptedException e){
            e.printStackTrace();}}}

**  消息接收监听类中处理业务方法**

/**
 * 主题型-消息接收监听类
 */@Slf4j@ComponentpublicclassGeoMQReceiver{@AutowiredprivateGeoController geoController;/**
     * 默认是单线程监听队列,消息消费会慢
     * 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程
     */@RabbitHandler@RabbitListener(queues ="geo.nb.inout.queue", concurrency ="5-8")publicvoidprocess(Map<String,Object> map,Message message,Channel channel)throwsIOException{try{
            geoController.business(map);// 手动确认消息// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){// 确认失败 将消息重新放回队列,让别人消费// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}

第六步:使用JMeter进行压力测试

  略…


**  总结:当项目并发压力上来的时候,可以通过增加服务器性能、服务集群、数据缓存批量处理、数据库分库等多种方案进行提升服务并发性能,基于消息中间件削峰只是其中的一个技术方案,可以依据自己的项目需求选择合理的并发削峰方案。建议做成配置性选择走常规路由或消息中间件,后期只需要根据业务数据请求量在配置中心改配置就可以实现了,保障服务不会宕机。**


本文转载自: https://blog.csdn.net/weixin_50989469/article/details/126783986
版权归原作者 Jon Young 所有, 如有侵权,请联系我们删除。

“基于RabbitMq的削峰实例”的评论:

还没有评论