0


SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列、死信、备份交换机

一、应用场景

  • 业务中心根据业务需求向特定用户发送消息;发送前不确定由哪个用户接收

在这里插入图片描述

  • 特定用户接收特定消息;用户可以退出,再切换别的用户登录,用户登录后只接收与自已对应的消息

在这里插入图片描述

二、总体要求

项目要足够稳健,消息不能丢失

  • 交换机、队列、消息持久化
  • 队列有容量限制;如:3000
  • 消息发送后需要确认(非自动确认)
  • 未发送成功的消息,由缓存保存,定时重发
  • 交换机收到消息,但无法投递时,转发至备份交换机,再广播至对应队列
  • 费时操作采用异步方式

三、架构图

在这里插入图片描述

四、安装RabbitMQ

参考如下三篇文章

  • 【RabbitMQ】RabbitMQ入门及安装
  • 【RabbitMQ】Docker中安装RabbitMQ
  • 【图文详解】RabbitMQ集群搭建、镜像队列、负载均衡HAProxy、故障转移Keepalived

五、搭建SpringBoot项目

  • java1.8
  • spring-boot 2.6.7

1、依赖

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.7</version></parent><modelVersion>4.0.0</modelVersion><groupId>com.tuwer</groupId><artifactId>mq</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- amqp-client --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- amqp-client Java原生依赖 --><!--        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>--><!-- hutool-all --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.2</version></dependency><!-- jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.13.3</version></dependency><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>2.13.3</version></dependency><dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.0.1</version></dependency><!-- fastjson --><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.3</version></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version></dependency><!-- 工具类 --><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><!-- 测试 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies></project>

2、application.yml

spring:rabbitmq:host: 192.168.3.174
    port:5672username: admin
    password: admin
    virtual-host: /
    # 交换机接收确认publisher-confirm-type: correlated
    # 交换机回退消息#publisher-returns: true

2、启动类

@EnableAsync

开启异步操作

packagecom.tuwer;importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.scheduling.annotation.EnableAsync;/**
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */@EnableAsync@SpringBootApplicationpublicclassMqApp{publicstaticvoidmain(String[] args){SpringApplication.run(MqApp.class, args);}}

3、基础类

3.1、常量类

packagecom.tuwer.constant;/**
 * <p>系统常量类</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */publicclassConstants{/**
     * 队列容量、通道预取值
     * 队列容量应根据项目需要,设置合适的值;
     * 本案例中为了测试方便设为5
     */publicstaticfinalintQUEUE_CAPACITY=5;publicstaticfinalintPRE_FETCH_SIZE=10;/**
     * 交换机
     */publicstaticfinalStringNORMAL_EXCHANGE="normal_exchange";publicstaticfinalStringBACKUP_EXCHANGE="backup_exchange";/**
     * 队列
     */publicstaticfinalStringBACKUP_QUEUE="backup_queue";}

3.2、雪花算法工具类

获取Long型id:

SnowflakeUtil.getInstance().nextId()
packagecom.tuwer.util;importlombok.extern.slf4j.Slf4j;importjava.text.MessageFormat;/**
 * <p>雪花算法工具类</p>
 *
 * @author 土味儿
 * Date 2022/6/2
 * @version 1.0
 */@Slf4j@SuppressWarnings("all")publicclassSnowflakeUtil{// ==============================Fields===========================================/**
     * 开始时间戳 (2000-01-01 00:00:00)
     */privatestaticfinallongTWEPOCH=946656000000L;/**
     * 机器id所占的位数 5
     */privatestaticfinallongWORKER_ID_BITS=5L;/**
     * 数据标识id所占的位数 5
     */privatestaticfinallongDATA_CENTER_ID_BITS=5L;/**
     * 支持的最大机器id,结果是 31
     */privatestaticfinallongMAX_WORKER_ID=~(-1L<<WORKER_ID_BITS);/**
     * 支持的最大数据标识id,结果是 31
     */privatestaticfinallongMAX_DATA_CENTER_ID=~(-1L<<DATA_CENTER_ID_BITS);/**
     * 序列在id中占的位数
     */privatestaticfinallongSEQUENCE_BITS=12L;/**
     * 机器ID向左移12位
     */privatestaticfinallongWORKER_ID_SHIFT=SEQUENCE_BITS;/**
     * 数据标识id向左移17位(12+5)
     */privatestaticfinallongDATA_CENTER_ID_SHIFT=SEQUENCE_BITS+WORKER_ID_BITS;/**
     * 时间戳向左移22位(5+5+12)
     */privatestaticfinallongTIMESTAMP_LEFT_SHIFT=SEQUENCE_BITS+WORKER_ID_BITS+DATA_CENTER_ID_BITS;/**
     * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
     */privatestaticfinallongSEQUENCE_MASK=~(-1L<<SEQUENCE_BITS);/**
     * 步长 1024
     */privatestaticfinallongSTEP_SIZE=1024;/**
     * unsigned int max value
     */privatestaticfinallongUINT_MAX_VALUE=0xffffffffL;/**
     * 工作机器ID(0~31)
     */privatelong workerId;/**
     * 工作机器ID 计数器
     */privatelong workerIdFlags =0L;/**
     * 数据中心ID(0~31)
     */privatelong dataCenterId;/**
     * 数据中心ID 计数器
     */privatelong dataCenterIdFlags =0L;/**
     * 毫秒内序列(0~4095)
     */privatelong sequence =0L;/**
     * 毫秒内序列基数[0|1024|2048|3072]
     */privatelong basicSequence =0L;/**
     * 上次生成ID的时间戳
     */privatelong lastTimestamp =-1L;/**
     * 工作模式
     */privatefinalWorkMode workMode;publicenumWorkMode{NON_SHARED,RATE_1024,RATE_4096;}//==============================单例模式(静态内部类)=====================================privatestaticclassInnerClass{privatestaticfinalSnowflakeUtilINNER_DEMO=newSnowflakeUtil();}publicstaticSnowflakeUtilgetInstance(){returnInnerClass.INNER_DEMO;}//==============================Constructors=====================================publicSnowflakeUtil(){this(0,0,WorkMode.RATE_4096);}/**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     */publicSnowflakeUtil(long workerId,long dataCenterId){this(workerId, dataCenterId,WorkMode.RATE_4096);}/**
     * 构造函数
     *
     * @param workerId     工作ID (0~31)
     * @param dataCenterId 数据中心ID (0~31)
     * @param workMode     工作模式
     */publicSnowflakeUtil(long workerId,long dataCenterId,WorkMode workMode){this.workMode = workMode;if(workerId >MAX_WORKER_ID|| workerId <0){thrownewIllegalArgumentException(MessageFormat.format("worker Id can't be greater than {0} or less than 0",MAX_WORKER_ID));}if(dataCenterId >MAX_DATA_CENTER_ID|| dataCenterId <0){thrownewIllegalArgumentException(MessageFormat.format("datacenter Id can't be greater than {0} or less than 0",MAX_DATA_CENTER_ID));}this.workerId = workerId;this.workerIdFlags =setSpecifiedBitTo1(this.workerIdFlags,this.workerId);this.dataCenterId = dataCenterId;this.dataCenterIdFlags =setSpecifiedBitTo1(this.dataCenterIdFlags,this.dataCenterId);}// ==============================Methods==========================================/**
     * 获取机器id
     *
     * @return 所属机器的id
     */publiclonggetWorkerId(){return workerId;}/**
     * 获取数据中心id
     *
     * @return 所属数据中心id
     */publiclonggetDataCenterId(){return dataCenterId;}/**
     * 获得下一个ID (该方法是线程安全的)
     *
     * @return SnowflakeId
     */publicsynchronizedlongnextId(){long timestamp =timeGen();//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常if(timestamp <this.lastTimestamp){if(timestamp >TWEPOCH){if(WorkMode.NON_SHARED==this.workMode){nonSharedClockBackwards(timestamp);}elseif(WorkMode.RATE_1024==this.workMode){rate1024ClockBackwards(timestamp);}else{thrownewRuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));}}else{thrownewRuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));}}//如果是同一时间生成的,则进行毫秒内序列if(this.lastTimestamp == timestamp){this.sequence =(this.sequence +1)&SEQUENCE_MASK;//毫秒内序列溢出if(this.sequence ==0){//阻塞到下一个毫秒,获得新的时间戳
                timestamp =tilNextMillis(this.lastTimestamp);}}//时间戳改变,毫秒内序列重置else{this.sequence =this.basicSequence;}//上次生成ID的时间戳this.lastTimestamp = timestamp;//移位并通过或运算拼到一起组成64位的IDreturn((timestamp -TWEPOCH)<<TIMESTAMP_LEFT_SHIFT)|(this.dataCenterId <<DATA_CENTER_ID_SHIFT)|(this.workerId <<WORKER_ID_SHIFT)|this.sequence;}/**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     *
     * @param lastTimestamp 上次生成ID的时间戳
     * @return 当前时间戳
     */protectedlongtilNextMillis(long lastTimestamp){long timestamp0;do{
            timestamp0 =timeGen();}while(timestamp0 <= lastTimestamp);return timestamp0;}/**
     * 返回以毫秒为单位的当前时间
     *
     * @return 当前时间(毫秒)
     */protectedlongtimeGen(){returnSystem.currentTimeMillis();}/**
     * 尝试解决时钟回拨<br>【* 仅用于 单机生成不对外 的情况 *】
     *
     * @param timestamp 当前时间戳
     * @return void
     */privatevoidnonSharedClockBackwards(long timestamp){if(this.dataCenterIdFlags >=UINT_MAX_VALUE&&this.workerIdFlags >=UINT_MAX_VALUE){thrownewRuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));}else{//如果仅用于生成不重复的数值,尝试变更 dataCenterId 或 workerId 修复时钟回拨问题

            log.warn("Clock moved backwards. Refusing to generate id for {} milliseconds", lastTimestamp - timestamp);//先尝试变更 dataCenterId,当 dataCenterId 轮询一遍之后,尝试变更 workerId 并重置 dataCenterIdif(this.dataCenterIdFlags >=UINT_MAX_VALUE){if(++this.workerId >MAX_WORKER_ID){this.workerId =0L;}this.workerIdFlags =setSpecifiedBitTo1(this.workerIdFlags,this.workerId);// 重置 dataCenterId 和 dataCenterIdFlagsthis.dataCenterIdFlags =this.dataCenterId =0L;}else{if(++this.dataCenterId >MAX_DATA_CENTER_ID){this.dataCenterId =0L;}}this.dataCenterIdFlags =setSpecifiedBitTo1(this.dataCenterIdFlags,this.dataCenterId);this.lastTimestamp =-1L;
            log.warn("Try to fix the clock moved backwards. timestamp : {}, worker Id : {}, datacenter Id : {}", timestamp, workerId, dataCenterId);}}/**
     * 尝试解决时钟回拨<br>【* 仅用于每毫秒生成量 不大于 1024 的情况 *】
     *
     * @param timestamp 当前时间戳
     * @return void
     */privatevoidrate1024ClockBackwards(long timestamp){if(this.basicSequence >(SEQUENCE_MASK-STEP_SIZE)){thrownewRuntimeException(MessageFormat.format("Clock moved backwards. Refusing to generate id for {0} milliseconds", lastTimestamp - timestamp));}else{
            log.warn("Clock moved backwards. Refusing to generate id for {} milliseconds", lastTimestamp - timestamp);this.basicSequence +=STEP_SIZE;this.lastTimestamp =-1L;
            log.warn("Try to fix the clock moved backwards. timestamp : {}, basicSequence : {}", timestamp, basicSequence);}}/**
     * Set the specified bit to 1
     *
     * @param value raw long value
     * @param index bit index (From 0~31)
     * @return long value
     */privatelongsetSpecifiedBitTo1(long value,long index){return value |=(1L<< index);}/**
     * Set the specified bit to 0
     *
     * @param value raw long value
     * @param index bit index (From 0~31)
     * @return long value
     */privatelongsetSpecifiedBitTo0(long value,long index){return value &=~(1L<< index);}/**
     * Get the specified bit
     *
     * @param value raw long value
     * @param index bit index(From 0-31)
     * @return 0 or 1
     */privateintgetSpecifiedBit(long value,long index){return(value &(1L<< index))==0?0:1;}}

3.3、缓存模型类

缓存操作不是本文的重点,用模型类代替;实际部署时可换作redis

packagecom.tuwer.cache;importjava.util.Set;importjava.util.concurrent.ConcurrentNavigableMap;importjava.util.concurrent.ConcurrentSkipListMap;/**
 * 模拟缓存
 *
 * @author 土味儿
 * Date 2023/1/3
 * @version 1.0
 */publicclassCacheModel{/**
     * 并发Map
     */privatestaticConcurrentSkipListMap<Long,String> cache =newConcurrentSkipListMap<>();/**
     * 存入缓存
     *
     * @param key
     * @param value
     */publicstaticvoidput(Long key,String value){
        cache.put(key, value);System.out.println("存入缓存;【key】"+ key +"【value】"+ value);print();}/**
     * 获取value
     *
     * @param key
     * @return
     */publicstaticStringget(Long key){String v = cache.get(key);System.out.println("从缓存中获取;【key】"+ key +"【value】"+ v);return v;}/**
     * 删除key
     *
     * @param key
     */publicstaticvoiddel(Long key){String v = cache.remove(key);System.out.println("从缓存中删除;【key】"+ key +"【value】"+ v);print();}/**
     * 删除小于等于key的多个值
     *
     * @param key
     */publicstaticvoiddelMany1(Long key){Set<Long> keys = cache.keySet();int n =0;for(Long k : keys){if(k <= key){
                cache.remove(k);
                n++;}}System.out.println("从缓存中删除小于等于;【"+ key +"】的多个值;共有 "+ n +" 个");print();}/**
     * 删除小于等于key的多个值
     * ConcurrentNavigableMap
     * @param key
     */publicstaticvoiddelMany(Long key){// 得到批量确认信息Map(只能得到小于key的值)ConcurrentNavigableMap<Long,String> confirmMap = cache.headMap(key);System.out.println("从缓存中删除小于等于;【"+ key +"】的多个值;共有 "+(confirmMap.size()+1)+" 个");// 清空已经确认的
        confirmMap.clear();// 单独再删除等于key的值
        cache.remove(key);print();}publicstaticvoidprint(){System.out.println("当前缓存大小:"+ cache.size());Set<Long> keys = cache.keySet();for(Long key : keys){System.out.print(key);System.out.print(" | ");//System.out.println(cache.get(key));}System.out.println();}}

4、配置类 MqConfig.java

packagecom.tuwer.config;importcom.tuwer.constant.Constants;importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;/**
 * <p>配置类</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */@ConfigurationpublicclassMqConfig{@ResourceprivateCachingConnectionFactory connectionFactory;@ResourceprivateRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){System.out.println("初始化...");// 删除普通空队列(排除掉备份队列),减轻RabbitMQ的压力}/**
     * 获取普通交换机
     *
     * @return
     */@Bean("normalExchange")publicDirectExchangegetNormalExchange(){returnExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE)// 持久化.durable(true)// 备份(候补)交换机.withArgument("alternate-exchange",Constants.BACKUP_EXCHANGE).build();}/**
     * 备份交换机
     * 类型:fanout
     *
     * @return
     */@Bean("backupExchange")publicFanoutExchangegetBackupExchange(){returnnewFanoutExchange(Constants.BACKUP_EXCHANGE);}/**
     * 获取备份队列
     *
     * @return
     */@Bean("backupQueue")publicQueuegetBackupQueue(){returnQueueBuilder.durable(Constants.BACKUP_QUEUE).build();}/**
     * 备份队列 绑定 备份交换机
     *
     * @param queue
     * @param exchange
     * @return
     */@BeanpublicBindingbackupQueueBindBackupExchange(@Qualifier("backupQueue")Queue queue,@Qualifier("backupExchange")FanoutExchange exchange
    ){returnBindingBuilder.bind(queue).to(exchange);}/**
     * 用于动态创建队列、交换机,并绑定
     * @return
     */@BeanpublicRabbitAdminrabbitAdmin(){//return new RabbitAdmin(connectionFactory);returnnewRabbitAdmin(rabbitTemplate);}/**
     * 用于设置动态监听
     * @return
     */@BeanpublicSimpleMessageListenerContainersimpleMessageListenerContainer(){SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        container.setConcurrentConsumers(10);
        container.setMaxConcurrentConsumers(50);// 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 预取值
        container.setPrefetchCount(Constants.PRE_FETCH_SIZE);// 创建队列(在用户登录后创建)//createNormalQueueAndBind.create(username);//listenerContainer.setQueueNames("q_" + username);//listenerContainer.setMessageListener(myAckReceiver);return container;}}

5、普通队列动态创建类

packagecom.tuwer.service;importcom.tuwer.constant.Constants;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.*;importorg.springframework.amqp.rabbit.core.RabbitAdmin;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.util.Locale;importjava.util.Objects;importjava.util.Properties;/**
 * <p>动态创建队列</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */@Component@Slf4jpublicclassCreateNormalQueueAndBind{@ResourceprivateDirectExchange normalExchange;@ResourceprivateRabbitAdmin rabbitAdmin;/**
     * 动态创建普通队列,并绑定至普通交换机
     *
     * @param routingKey
     */publicvoidcreate(String routingKey){String key = routingKey.toLowerCase(Locale.ROOT);String queueName ="q_"+ key;// 创建普通队列Queue queue =null;// 查询队列属性,为null时,表示队列不存在Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);// 队列不存在时,创建if(Objects.isNull(queueProperties)){
            log.info("队列【{}】不存在,创建中...", queueName);
            queue =QueueBuilder.durable(queueName).maxLength(Constants.QUEUE_CAPACITY).build();
            rabbitAdmin.declareQueue(queue);}// 绑定至普通交换机if(Objects.nonNull(queue)){Binding binding =BindingBuilder.bind(queue).to(normalExchange).with(key);
            rabbitAdmin.declareBinding(binding);}}}

经过测试,如果队列满了的时候,再向队列发送消息时,最老的消息被丢弃,且不会启用备份交换机;为了防止信息丢失,加入

死信交换机

死信队列

,当前队列满了的时候,最老的信息进入死信交换机,再转至死信队列

  • 原代码
// 原
queue =QueueBuilder.durable(queueName).maxLength(Constants.QUEUE_CAPACITY).build();
  • 新代码
// 新
queue =QueueBuilder.durable(queueName)// 设置队列长度.maxLength(Constants.QUEUE_CAPACITY)// 设置死信交换机.deadLetterExchange(Constants.BACKUP_EXCHANGE)// 设置死信RoutingKey(死信队列).deadLetterRoutingKey(Constants.BACKUP_QUEUE)// 改变溢出规则:当队列溢出时,拒绝接收新消息//.overflow(QueueBuilder.Overflow.rejectPublish).build();

6、发布确认回调类

packagecom.tuwer.service;importcom.tuwer.cache.CacheModel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;importjavax.annotation.Resource;/**
 * <p>发布确认回调类</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */@Slf4j@ComponentpublicclassMyConfirmCallbackimplementsRabbitTemplate.ConfirmCallback{@ResourceprivateRabbitTemplate rabbitTemplate;/**
     * 把当前类注入到 RabbitTemplate
     *
     * @PostConstruct 表示在执行当前类的构造时运行
     * 因为 ConfirmCallback接口是 RabbitTemplate的内部类
     */@PostConstructpublicvoidinit(){
        rabbitTemplate.setConfirmCallback(this);}/**
     * 交换机确认回调方法
     *
     * @param correlationData 回调消息
     * @param ack             交换机是否确认收到了消息:true:收到了;false:没有收到
     * @param cause           没有收到消息的原因
     */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){// 消息IDString id = correlationData !=null? correlationData.getId():"";if(ack){// 收到
            log.info("交换机收到了消息!编号:{}", id);// 从缓存中删除CacheModel.del(Long.parseLong(id));}else{// 未收到
            log.info("交换机没有收到编号为:{} 的消息!原因:{}", id, cause);}}}

从缓存中删除的优化操作

  • 采用Redis后发现的问题:部分信息确认后没有从Redis中删除成功;- 分析原因:确认回调方法是异步执行,可能在发送时,信息还没有存入缓存中,回调方法就已经执行, 这时删除操作将会失败,信息会一直留在缓存中;- 解决方法:休眠几秒,待信息存入缓存后,重新删除

在这里插入图片描述

  • 代码优化
@Override@SneakyThrowspublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){// 消息IDString id = correlationData !=null? correlationData.getId():"";if(ack){// 收到
            log.info("交换机收到了消息!编号:{}", id);// 从缓存中删除RedisUtil<String> redisUtil =newRedisUtil<>(redisTemplate);Long n = redisUtil.rkey.del(id);if(n >0){
                log.info("从缓存中删除消息!编号:{}", id);}else{/*
                 * 确认回调方法是异步执行,可能在发送时,信息还没有存入缓存中,回调方法就已经执行,
                 * 这时删除操作将会失败,信息会一直留在缓存中;
                 * 解决方法:休眠几秒,待信息存入缓存后,重新删除
                 */TimeUnit.SECONDS.sleep(3);
                n = redisUtil.rkey.del(id);if(n >0){
                    log.info("从缓存中首次删除消息失败,再次尝试删除成功!编号:{}", id);}}}else{// 未收到
            log.info("交换机没有收到编号为:{} 的消息!原因:{}", id, cause);}}

7、生产者服务类

发送消息采用异步方式

@Async
packagecom.tuwer.service;importcom.tuwer.cache.CacheModel;importcom.tuwer.constant.Constants;importcom.tuwer.util.SnowflakeUtil;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.scheduling.annotation.Async;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.util.*;/**
 * <p>生产者</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */@Component@Slf4jpublicclassProducer{@ResourceprivateRabbitTemplate rabbitTemplate;@ResourceprivateCreateNormalQueueAndBind createNormalQueueAndBind;/**
     * 异步发送消息
     *
     * @param msg
     */@AsyncpublicvoidsendMsg(String msg,String routingKey){String key = routingKey.toLowerCase(Locale.ROOT);// 创建普通队列,并绑定至普通交换机
        createNormalQueueAndBind.create(key);long id =SnowflakeUtil.getInstance().nextId();CorrelationData correlationData =newCorrelationData(String.valueOf(id));
        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,
                key,
                msg,
                correlationData);// 存入缓存CacheModel.put(id, msg);

        log.info("消息:【{}】已发送!编号:{}", msg, correlationData.getId());}}

8、消费者服务类

8.1、备份队列消费者

备份队列固定不变;config中配置,系统启动后自动创建

packagecom.tuwer.service;importcom.tuwer.config.MqConfig;importcom.tuwer.constant.Constants;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
 * <p>备份消费者</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */@Slf4j@ComponentpublicclassBackupConsumer{/**
     * 监听备份队列消息
     * @param message
     */@RabbitListener(queues =Constants.BACKUP_QUEUE)publicvoidreceiveMsg(Message message){String msg =newString(message.getBody());
        log.info("接收到备份队列的消息:【{}】", msg);}}

8.2、普通队列消费者

  • 队列不确定;用户登录后,动态切换要监控的队列;
  • 如果用户退出后,也要更新监控列表;省略
packagecom.tuwer.service;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * <p>普通消费者</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */@Slf4j@ComponentpublicclassNormalConsumer{@ResourceprivateSimpleMessageListenerContainer listenerContainer;@ResourceprivateMyAckReceiver myAckReceiver;@ResourceprivateCreateNormalQueueAndBind createNormalQueueAndBind;/**
     * 监听普通队列消息
     * 动态切换要监控的队列   
     *   
     * @param username
     *///@RabbitListener(queues = "队列名称")publicvoidreceiveMsg(String username){// 创建队列
        createNormalQueueAndBind.create(username);// 设置要监听的队列(用set,不是add)
        listenerContainer.setQueueNames("q_"+ username);// 设置消息接收器
        listenerContainer.setMessageListener(myAckReceiver);// 当前监听的队列列表String[] queueNames = listenerContainer.getQueueNames();System.out.println("当前监听的队列:");for(String queueName : queueNames){System.out.println(queueName);}System.out.println("----------");}}
  • 消息接收器
packagecom.tuwer.service;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;importorg.springframework.stereotype.Component;/**
 * <p>消息接收器</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */@Slf4j@ComponentpublicclassMyAckReceiverimplementsChannelAwareMessageListener{@OverridepublicvoidonMessage(Message message,Channel channel)throwsException{long deliveryTag = message.getMessageProperties().getDeliveryTag();try{String consumerQueue = message.getMessageProperties().getConsumerQueue();String msg =newString(message.getBody());
            log.info("MyAckReceiver的【{}】队列,收到的消息:【{}】", consumerQueue, msg);
            channel.basicAck(deliveryTag,true);}catch(Exception e){
            channel.basicReject(deliveryTag,false);
            e.printStackTrace();}}}

9、Controller接口类

9.1、发送消息

http://localhost:8080/sendMsg/用户名/消息
packagecom.tuwer.controller;importcom.tuwer.service.Producer;importlombok.extern.slf4j.Slf4j;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;/**
 * <p>发送消息</p>
 *
 * @author 土味儿
 * Date 2023/1/5
 * @version 1.0
 */@Slf4j@RestControllerpublicclassProducerController{@ResourceprivateProducer producer;/**
     * 发送消息
     *
     * @param msg      消息内容
     * @param username 接收消息的用户
     * @return
     */@GetMapping("/sendMsg/{username}/{msg}")publicStringsendMsg(@PathVariable("msg")String msg,@PathVariable("username")String username
    ){
        producer.sendMsg(msg, username);return"OK!";}}

9.2、模拟用户登录

http://localhost:8080/login/用户名
packagecom.tuwer.controller;importcom.tuwer.service.NormalConsumer;importlombok.extern.slf4j.Slf4j;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;/**
 * <p>模拟登录</p>
 *
 * @author 土味儿
 * Date 2023/1/7
 * @version 1.0
 */@Slf4j@RestController@RequestMapping("/login")publicclassLoginController{@ResourceprivateNormalConsumer consumer;/**
     * 登录
     *
     * @param username
     * @return
     */@GetMapping("/{username}")publicStringlogin(@PathVariable("username")String username
    ){
        log.info("用户 {} 已登录", username);// 接收该用户的队列消息
        consumer.receiveMsg(username);return username;}}

10、定时重发

缓存中未得到确认的消息,由定时器重新发送;

在这里插入图片描述

信息重发调度器(这是系统优化后增加的类)

  • 增加了信息实体类 MqMsg
  • 增加了异步发送多条信息方法 sendMsg(List<MqMsg> listOfMqMsg)
packagecom.tuwer.service;importcom.alibaba.fastjson2.JSON;importcom.tuwer.pojo.po.MqMsg;importcom.tuwer.util.RedisUtil;importlombok.extern.slf4j.Slf4j;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.scheduling.annotation.Scheduled;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;importjava.time.Instant;importjava.time.ZoneId;importjava.util.HashMap;importjava.util.Map;importjava.util.Set;/**
 * <p>调度器</p>
 *
 * @author 土味儿
 * Date 2023/1/13
 * @version 1.0
 */@Slf4j@ComponentpublicclassScheduler{@ResourceprivateRedisTemplate redisTemplate;@ResourceprivateProducer producer;/**
     * 缓存信息重发(每30分钟检查一次)
     */@Scheduled(cron ="0 0/30 * * * ?")publicvoidresendForCache(){
        log.info("开始执行缓存信息重发...");// 1、从缓存中获取全部信息(key集合)RedisUtil<String> redisUtil =newRedisUtil<>(redisTemplate);Set<String> keys = redisUtil.rkey.keys("*");int s1 = keys.size();if(s1 <1){
            log.info("缓存中没有信息!");return;}
        log.info("从缓存中获取到 {} 条信息!", s1);// 2、遍历分析信息String msgJson;MqMsg mqMsg;List<MqMsg> list =newArrayList<>();// 需要重发的时间差(秒)long n =10*60;// 当前时刻距离原点时刻的秒值long nowSecond =Instant.now().getEpochSecond();

        log.info("开始检查...");for(String key : keys){// 得到当前key的值
            msgJson = redisUtil.rstring.get(key);// 解析成MqMsg对象
            mqMsg =JSON.parseObject(msgJson,MqMsg.class);// 当前信息距离原点时刻的秒值long msgSecond = mqMsg.getTime().atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();// 判断是否已达到重发的要求if((nowSecond - msgSecond)> n){/*
                 * 需要重发
                 * -------------------
                 * 把信息封装进list中
                 * list中元素是:MqMsgVO对象
                 */
                list.add(mqMsgVO);
                log.info("信息:{} 已过去 {} 秒,封装进集合中!", key, n);}}// 3、重发;调用信息发送方法int s2 = list.size();if(s2 >0){
            log.info("共有 {} 条信息需要重发!", s2);
            producer.sendMsg(list);
            log.info("重发指令已发出!");return;}
        log.info("没有需要重发的信息!");}}
  • 信息实体类
packagecom.tuwer.pojo.po;importcom.fasterxml.jackson.annotation.JsonFormat;importcom.fasterxml.jackson.databind.annotation.JsonDeserialize;importcom.fasterxml.jackson.databind.annotation.JsonSerialize;importcom.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;importcom.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;importcom.tuwer.util.SnowflakeUtil;importlombok.Data;importlombok.NoArgsConstructor;importjava.time.LocalDateTime;/**
 * <p>消息队列消息实体类</p>
 *
 * @author 土味儿
 * Date 2023/1/9
 * @version 1.0
 */@Data@NoArgsConstructorpublicclassMqMsg{privateLong id;privateString from;privateStringto;privateint type;privateString title;privateString content;@JsonDeserialize(using =LocalDateTimeDeserializer.class)@JsonSerialize(using =LocalDateTimeSerializer.class)@JsonFormat(pattern ="yyyy-MM-dd HH:mm:ss", timezone ="GMT+8")privateLocalDateTime time;privateboolean read;publicMqMsg(String from,Stringto,int type,String title,String content){// 雪花算法生成idthis.id =SnowflakeUtil.getInstance().nextId();this.from = from;this.to=to;this.type = type;this.title = title;this.content = content;// 当前时间this.time =LocalDateTime.now();this.read =false;}}
  • 异步发送多条信息
/**
 * <p>消息生产者</p>
 *
 * @author 土味儿
 * Date 2023/1/4
 * @version 1.0
 */@Component@Slf4jpublicclassProducer{@ResourceprivateRabbitTemplate rabbitTemplate;@ResourceprivateCreateNormalQueueAndBind createNormalQueueAndBind;@ResourceprivateRedisTemplate redisTemplate;/**
     * 异步发送单条消息
     *
     * @param msgJson    消息实体的json字符串
     * @param routingKey
     */@AsyncpublicvoidsendMsg(String msgJson,String routingKey){// 略}/**
     * 异步发送多条消息
     *
     * @param listOfMqMsg list集合;元素为:MqMsg
     */@AsyncpublicvoidsendMsg(List<MqMsg> listOfMqMsg){if(CollUtil.isEmpty(listOfMqMsg)){
            log.info("空集合!没有可发送的消息!");return;}long id;Stringto, msgJson;// redis工具类RedisUtil<String> redisUtil =newRedisUtil<>(redisTemplate);for(MqMsg msg : listOfMqMsg){// id
            id = msg.getId();// 接收者 toto= msg.getTo();// json
            msgJson =JSON.toJSONString(msg);// 创建普通队列,并绑定至普通交换机
            createNormalQueueAndBind.create(to);// 发布确认回调序号CorrelationData correlationData =newCorrelationData(String.valueOf(id));
            rabbitTemplate.convertAndSend(MqConstants.NORMAL_EXCHANGE,to,
                    msgJson,
                    correlationData);// 存入缓存if(redisUtil.rstring.add(correlationData.getId(), msgJson)){
                log.info("消息:{} 已存入缓存!", correlationData.getId());}}

        log.info("{} 条消息已发送!", listOfMqMsg.size());}}
  • FastJson2 中 List< MqMsg > 的序列化与反序列化
// 序列化List<MqMsg> list =newArrayList<>();
list.add(newMqMsg(...));
list.add(newMqMsg(...));
list.add(newMqMsg(...));String listJson =JSON.toJSONString(list)// 反序列化List<MqMsg> list =JSON.parseArray(listJson,MqMsg.class);

11、项目结构图

在这里插入图片描述

六、测试

1、启动后自动创建交换机、队列

  • 启动前

在这里插入图片描述

在这里插入图片描述

  • 启动后

在这里插入图片描述

在这里插入图片描述

2、向用户

user1

发送消息

http://localhost:8080/sendMsg/user1/测试1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

3、登录

user1

接收消息

http://localhost:8080/login/user1

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4、向

user2

user3

分别发消息

当前登录用户是

user1

,向用户

user2

user3

发送消息时,

user1

是接收不到的,消息会存储在相应队列中

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

5、切换用户

user3

user1

切换到

user3

,同时监控的队列由

q_user1

动态切换到

q_user3

,队列

q_user3

中的消息将被消费掉

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

6、再次向

user1

发消息

当前登录用户是

user3

,这时再向上一个登录用户

user1

发消息,消息应该不会被消费,存在

q_user1

队列中

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

7、备份交换机接收消息

  • 本案例中发送消息前,先动态创建队列,一般不会出现信息无法路由的情况,也就不会因为无法路由而启动备份交换机(除非一些极端的情况)
  • 当普通队列满的时候,再向其发送消息,最老的信息变为死信,进入死信交换机;本案例中把备份交换机当成死信交换机,备份队列当成死信队列。
  • user2连发10条信息,且user2不登录,队列q_user2容量为5,溢出后最老的信息进入备份交换机(发送前先删除原来的q_user2队列)

在这里插入图片描述

在这里插入图片描述

  • 启动测试

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

  • 改变溢出规则;默认是丢弃最老(开头)的消息

在这里插入图片描述

在创建队列时加上

overflow(QueueBuilder.Overflow.rejectPublish)

,改为拒绝接收新消息,此时生产者会收到拒绝接收的消息提示,缓存中的消息将不会被删除,而是定时重发;备份交换机也不会启用,拒绝的信息不会进入备份队列,不符合设计要求,所以使用默认的溢出规则。

七、不足

  • 每个用户对应一个普通队列;当用户过多时,相应的队列也会很多,并且队列是持久化的,会占用较多的系统资源;解决思路:定时删除空队列

本文转载自: https://blog.csdn.net/tu_wer/article/details/128593526
版权归原作者 土味儿~ 所有, 如有侵权,请联系我们删除。

“SpringBoot下RabbitMQ的实战应用:动态创建和动态监控队列、死信、备份交换机”的评论:

还没有评论