0


RabbitMq异步请求+Redis轮询解决响应时间过长

RabbitMq异步请求+Redis轮询解决响应时间过长

如果还没有学习

Rabbit

的建议去学一下我的另一篇RabbitMq的使用再来看这个实际的使用`

当前问题:处理时间超时导致前端页面响应超时是一个比较常见的问题。这可能由于后端执行任务时间过长、网络延迟、资源不足等原因引起。

解决的思路有:
解决方法步骤缺点优化后端代码检查后端代码,看是否有可以优化的地方。可能存在一些复杂、低效或者重复的操作,通过优化这些代码可以提升后端执行效率。优化的效果不明显,当数据包的处理时间太长的时候处理时间是无法缩短的设置合理的超时时间在前端与后端的通信中,设置合理的超时时间。如果后端处理任务时间较长,适当增加前端请求的超时时间,但也要避免设置过长的超时时间。前端响应时间太久,造成前端页面的假死优化数据库的查询如果后端涉及数据库查询,确保数据库表的索引设置得当,优化查询语句,以提高查询效率。能解决查询的问题,但是数据处理的时间是解决不了的异步处理+轮询查询如果后端执行的任务比较耗时,考虑使用异步处理方式。将一些耗时的任务交给异步任务队列或线程池来处理,从而不阻塞主线程。并且添加一个轮询查询的接口,轮询查询处理的情况。


逻辑流程图

普通处理逻辑

image-20231227223626458
当我们的处理的耗时非常久的时候,前端一直在等待加载,会造成假死的状态,并且有可能会断开连接。
在这里插入图片描述

异步处理+轮询

当异步处理的时候直接将消息丢入到消息队列中,挨个的由消费者进行处理。处理的时候使用

Redis

根据时间戳来记录每一个处理的状态,最后再用一个轮询的接口进行查询。


文章目录

准备工作

导入依赖

<!-- Spring Boot 项目中更方便地集成 AMQP 的一种方式--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 普通的spring项目的mq客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency><!--开启web客户端,方便进行调用演示--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--        redis用来做轮询查询接口--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

Yaml配置

spring:rabbitmq:addresses: 192.168.1.110
    port:5672username: rabbit
    password: rabbit
    virtual-host: /test
#开启消息回执publisher-confirm-type: correlated
    publisher-returns:true# 设置消息的确认模式,默认的模式为auto自动确认,manual手动确认,所以需要channel去手动确认listener:simple:acknowledge-mode: manual

  redis:#Redis服务器地址host: localhost
    #端口port:6379#使用几号数据库database:0

配置Mq和Redis

Fanout交换机和消息队列

@ConfigurationpublicclassRabbitMqConfig{// 配置交换机@BeanpublicFanoutExchangebuildFanoutExchange(){returnExchangeBuilder.fanoutExchange("fanoutExchange").build();}// 配置消息队列@BeanpublicQueuebuildFanoutQueue(){returnQueueBuilder.durable("fanoutQueue").build();}// 绑定交换机和消息队列@BeanpublicBindingbuildFanoutBinding(){returnBindingBuilder.bind(buildFanoutQueue()).to(buildFanoutExchange());}}

这里我使用的是

fanout

(扇出)型交换机,这时一种广播类型,消息会被广播到所有与此交换机绑定的消息队列中。使用此类型

死信队列

消息队列中的数据,如果迟迟没有消费者来处理,那么就会一直占用消息队列的空间。比如我们模拟一下抢车票的场景,用户下单高铁票之后,会进行抢座,然后再进行付款,但是如果用户下单之后并没有及时的付款,这张票不可能一直让这个用户占用着,因为你不买别人还要买呢,所以会在一段时间后超时,让这张票可以继续被其他人购买。

这时,我们就可以使用死信队列,将那些用户超时未付款的或是用户主动取消的订单,进行进一步的处理,以下类型的消息都会被判定为死信:

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

在这里插入图片描述

@ConfigurationpublicclassDeadMqConfig{// 死信交换机@BeanpublicExchangedmExchange(){returnExchangeBuilder.fanoutExchange("dm.direct").build();}// 死信队列@BeanpublicQueuedmQueue(){returnQueueBuilder.nonDurable("dm-queue")// 消息队列的名称.build();}@BeanpublicBindingdmBinding(){returnBindingBuilder.bind(dmQueue()).to(dmExchange()).with("dm-routing")// 交换机和消息队列的路由对应关系.noargs();}}

redis配置

配置了Object和String的序列化方式

@ConfigurationpublicclassRedisConfig{//编写自定义redisTemplate@Bean@SuppressWarnings("all")publicRedisTemplate<String,Object>redisTemplate(RedisConnectionFactory redisConnectionFactory)throwsUnknownHostException{RedisTemplate<String,Object> template =newRedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);//Json序列化配置Jackson2JsonRedisSerializer jackson2JsonRedisSerializer =newJackson2JsonRedisSerializer(Object.class);ObjectMapper om =newObjectMapper();
        om.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);//String的序列化StringRedisSerializer stringRedisSerializer =newStringRedisSerializer();//key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);//hash的key也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);//value序列化方式采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);//hash的value的序列化方式采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();return template;}}

redis工具类

创建redis工具类,进行快速的进行数据存储和读取,最后用作轮询查询时候的标识处理数据的状态

@ComponentpublicclassRedisUtils{@AutowiredprivateRedisTemplate<String,Object> redisTemplate;/**
     * 普通缓存获取
     * @param key 键
     * @return 值
     */publicObjectget(String key){return key ==null?null: redisTemplate.opsForValue().get(key);}/**
     * 普通缓存放入
     * @param key   键
     * @param value 值
     * @return true成功 false失败
     */publicbooleanset(String key,Object value){try{
            redisTemplate.opsForValue().set(key, value);returntrue;}catch(Exception e){
            e.printStackTrace();returnfalse;}}/**
     * 普通缓存放入并设置时间
     * @param key   键
     * @param value 值
     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期
     * @return true成功 false 失败
     */publicbooleanset(String key,Object value,long time){try{if(time >0){
                redisTemplate.opsForValue().set(key, value, time,TimeUnit.SECONDS);}else{set(key, value);}returntrue;}catch(Exception e){
            e.printStackTrace();returnfalse;}}}

创建消费者

创建消费者(Listener)对消息队列进行监控,监听到消息以后进行消费,数据处理的各个阶段在

Redis

存储处理的状态,方便轮询查询。

注意:Redis需要设置过期时间(一周),防止内存泄漏

@Service@Slf4jpublicclassMqConsumer{@AutowiredRedisUtils redisUtils;/**
     * @param channel 用于确保消息被正确的处理
     * @param timeStamp 使用时间戳标识不同的消息请求,方便后期轮询查询
     * @param tag 每一个消息的独一无二的tag
     */@RabbitListener(queues ="fanoutQueue")publicvoidBuidConsumer(Channel channel,String timeStamp,@Header(AmqpHeaders.DELIVERY_TAG)long tag){try{
            redisUtils.set(timeStamp,"消息开始处理......");
            log.info("开始处理编号为: "+ timeStamp +" 的消息了");dealData(timeStamp);
            channel.basicAck(tag,false);//  确认消息
            log.info("理编号为: "+ timeStamp +" 的消息处理完成");}catch(Exception e){try{
                log.info(e.getMessage(), e);
                channel.basicNack(tag,false,false);// 丢入死信队列}catch(IOException ex){
                ex.printStackTrace();}}}@RabbitListener(queues ="dm-queue")publicvoidbuildDeadConsumer(Channel channel,String timeStamp,@Header(AmqpHeaders.DELIVERY_TAG)long tag){try{
            log.info(timeStamp +"进入处理异常");// 日志记录处理异常进入死信队列的消息
            channel.basicAck(tag,false);
            redisUtils.set(timeStamp,"异常处理完成 ....");// 消息队列直接处理完成}catch(IOException e){
            e.printStackTrace();}}// 耗时处理数据的方法publicvoiddealData(String timeStamp){
        redisUtils.set(timeStamp,"消息正在处理 ....");try{Thread.sleep(10000);}catch(InterruptedException e){
            redisUtils.set(timeStamp,"消息处理发生异常");
            e.printStackTrace();}
        redisUtils.set(timeStamp,"消息处理完成....",7*24*60*60L);// 设置过期时间,防止内存泄漏}}

创建Mvc层

创建服务层和消息回执

服务层直接把处理的消息放入交换机中,并且返回一个时间戳用来作为任务的唯一标识,用作后面的轮询查询的接口的参数。

消息回执:为了确保消息已将被成功的接受并且处理,也保证不会重复的传输数据,一旦消费者成功处理了消息并发送了回执,RabbitMQ 就知道这个消息已经被正确处理。这有助于防止消息被重复处理,确保消费者不会在某些故障情况下多次处理同一条消息。

Service层:

@Service@Slf4jpublicclassUserService{@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublicvoidregCallback(){// 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){// 如果ack为true代表消息已经收到if(!ack){// 这里可能要进行其他的方式进行存储
                    log.error("MQ队列应答失败,失败原因是:"+ cause.toString());return;}
                log.info("消息已经成功发送 !");}});}publicStringdealData(){String timeStamp =Long.toString(System.currentTimeMillis());// 将消息丢进交换机
        rabbitTemplate.convertAndSend("fanoutExchange","", timeStamp,newCorrelationData(timeStamp));return timeStamp;}}

消息回执

image-20231227221940048
rabbitmq在消息的发送与接收中,会经过上面的流程,这些流程中每一步都有可能导致消息丢失,或者消费失败甚至直接是服务器宕机等,这是我们服务接受不了的,为了保证消息的可靠性,rabbitmq提供了以下几种机制:

  • 生产者确认机制
  • 消息持久化存储
  • 消费者确认机制
  • 失败重试机制

实现ConfirmCallback接口

publicinterfaceConfirmCallback{voidconfirm(CorrelationData correlationData,boolean ack,String cause);}
  • correlationData:与消息关联的数据,通常是一个唯一标识符。在发送消息时,可以通过 CorrelationData 设置关联数据,然后在确认回调中获取。
  • ack:表示消息是否被成功确认。如果为 true,表示消息已成功发送到 Exchange;如果为 false,表示消息发送失败。
  • cause:如果消息发送失败,cause 中包含了失败的原因。

Controller层:

@RestController()@RequestMapping("/user")publicclassUserController{@AutowiredUserService userService;@AutowiredRedisUtils redisUtils;// 耗时久的处理数据接口@GetMapping("/deal")publicStringdealData(){return userService.dealData();}// 轮询查询接口@GetMapping("/status/{timeStamp}")publicStringgetStatus(@PathVariableString timeStamp){return redisUtils.get(timeStamp).toString();}}

创建轮询处理和轮询的接口

调用开始处理的接口然后调用服务层将消息放入交换机,然后给消费者进行处理。并且返回一个时间戳用作的轮询处理的状态的标识。

@RestController("/user")publicclassUserController{@AutowiredUserService userService;@AutowiredRedisUtils redisUtils;@GetMapping("/deal")publicStringdealData(){return userService.dealData();}// 根据时间戳进行轮询查询@GetMapping("/status/{timeStamp)")publicStringgetStatus(@PathVariableString timeStamp){return redisUtils.get(timeStamp).toString();}}

运行结果

目录

下面是我的目录结构,大家可以做一个参考:
在这里插入图片描述

结果

开始数据处理接口,接口马上响应成功,返回一个可以轮询查询的时间戳:

在这里插入图片描述

根据唯一标识时间戳,轮询查看处理过程:

在这里插入图片描述

IDEA控制台界面:连续的访问好几次RabbitMq接口,已经开始排队异步处理
在这里插入图片描述

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/asdfasaa/article/details/135257605
版权归原作者 叮咚Zz 所有, 如有侵权,请联系我们删除。

“RabbitMq异步请求+Redis轮询解决响应时间过长”的评论:

还没有评论