0


springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定

springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定

1. 数据库准备

droptableifexists mq_config;/*==============================================================*//* Table: mq_config                                             *//*==============================================================*/createtable mq_config
(
   mq_id                varchar(200)notnullcomment'交换机id',
   exchange_type        char(1)comment'交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)',
   exchange_name        varchar(200)comment'交换机名称',
   queue_name           varchar(200)comment'队列名称',
   binding              varchar(200)comment'绑定关系',
   delay_type           char(1)comment'是否死信队列(0:是;1:否)',
   version              bigint(20)default0comment'乐观锁',
   del_flag             char(1)default'0'comment'删除标志(0:存在; 1:删除)',statuschar(1)default'0'comment'记录状态(0:在用; 1:停用)',
   create_by            varchar(64)comment'创建人',
   create_time          timestamp(0)defaultCURRENT_TIMESTAMPcomment'创建时间',
   update_by            varchar(64)comment'修改人',
   update_time          datetime(0)defaultCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPcomment'修改时间',
   remark               varchar(500)comment'备注',primarykey(mq_id));altertable mq_config comment'mq配置表';

2. 依赖引入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- 连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><!-- swagger依赖 --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-micro-spring-boot-starter</artifactId><version>3.0.3</version></dependency><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>3.0.3</version></dependency><!-- mybatis-plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3. yml配置

# 应用名称server:port:8080spring:datasource:# 配置druid数据库连接池druid:#配置当前数据源类型type: com.alibaba.druid.pool.DruidDataSource
      # 配置MySQL的驱动程序类driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/schedule?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8username: root
      password: root
      # 配置监控统计拦截的filters,stat是sql监控,wall是防火墙(如果不添加则监控无效),添加log4j需要引入jar包filters: stat,wall
      # 连接池最大活跃连接数max-active:100# 连接池初始化连接数量initial-size:1# 配置获取连接等待超时的时间max-wait:60000# 连接池最小空闲数min-idle:1# 指定空闲连接检查、废弃连接清理、空闲连接池大小调整之间的操作时间间隔time-between-eviction-runs-millis:60000# 指定一个空闲连接最少空闲多久后可被清除min-evictable-idle-time-millis:300000# 连接是否有效的查询语句validation-query: select 'x'
      test-while-idle:truetest-on-borrow:falsetest-on-return:false#打开 PSCache,并且指定每个连接上 PSCache 的大小pool-prepared-statements:truemax-open-prepared-statements:50max-pool-prepared-statement-per-connection-size:20# 配置 DruidStatFilterweb-stat-filter:enabled:true#\u662F\u5426\u542F\u7528StatFilter\u9ED8\u8BA4\u503Ctrue# 排除一些不必要的url,比如.js,/jslib/等exclusions:"*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"# 过滤规则url-pattern: /*
      # 配置 DruidStatViewServletstat-view-servlet:#手动重置监控数据enabled:trueurl-pattern: /druid/*
        # IP白名单,没有配置或者为空,则允许所有访问allow:#IP黑名单,若白名单也存在,则优先使用deny:# 配置druid登录用户名、密码login-username: admin
        login-password: admin
        # HTML 中 Reset All 按钮reset-enable:truerabbitmq:host: 10.168.1.200
    port:5672virtual-host: test
    username: admin
    password: admin

mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

4. 创建mq操作类

@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassMqVoimplementsSerializable{privatestaticfinallong serialVersionUID =-3630888028848412302L;/**
     * 交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)
     */@ApiModelProperty(value ="交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)")privateString exchangeType;/**
     * 交换机名称
     */@ApiModelProperty(value ="交换机名称")privateString exchangeName;/**
     * 队列名称
     */@ApiModelProperty(value ="队列名称")privateString queueName;/**
     * 绑定关系
     */@ApiModelProperty(value ="绑定关系")privateString binding;/**
     * 是否死信队列(0:是;1:否)
     */@ApiModelProperty(value ="是否死信队列(0:是;1:否)")privateString delayType;/**
     * 操作类型(0:新增; 1:删除)
     */@ApiModelProperty(value ="操作类型(0:新增; 1:删除)")privateint type;}

5. mq操作方法

@Slf4j@ComponentpublicclassMqUtils{/**
     * 获取工厂配置类
     */@ResourceprivateConnectionFactory connectionFactory;/**
     * 新增消息对列
     */publicvoidmqOperate(MqVo mqVo){//交换机类型String exchangeType = mqVo.getExchangeType();
        log.info("exchangeType -> {}", exchangeType);//队列名称String queueName = mqVo.getQueueName();
        log.info("queueName -> {}", queueName);//交换机名称String exchangeName = mqVo.getExchangeName();
        log.info("exchangeName -> {}", exchangeName);//绑定关系String binding = mqVo.getBinding();
        log.info("binding -> {}", binding);//是否死信队列(0:是;1:否)String delayType = mqVo.getDelayType();
        log.info("delayType -> {}", delayType);//操作类型int status = mqVo.getType();RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);if(status ==0){//新增队列
            rabbitAdmin.declareQueue(newQueue(queueName));//新增交换机
            rabbitAdmin.declareExchange(getExchange(exchangeType, exchangeName, delayType));//新增绑定关系
            rabbitAdmin.declareBinding(newBinding(queueName,Binding.DestinationType.QUEUE, exchangeName, binding,null));}else{//删除队列
            rabbitAdmin.deleteQueue(queueName);//删除交换机
            rabbitAdmin.deleteExchange(exchangeName);//删除绑定关系
            rabbitAdmin.removeBinding(newBinding(queueName,Binding.DestinationType.QUEUE, exchangeName, binding,null));}close();}/**
     * 交换机生成方法
     * @param exchangeType 交换机类型
     * @param exchangeName 交换机名称
     * @param delayType 是否死信队列(0:是;1:否)
     * @return Exchange
     */privateExchangegetExchange(String exchangeType,String exchangeName,String delayType){Exchange exchange =newDirectExchange(exchangeName);String zero ="0";switch(exchangeType){case"0":if(zero.equals(delayType)){Map<String,Object> map =newHashMap<>(1);
                    map.put("x-delayed-type","direct");
                    exchange =newCustomExchange(exchangeName,"x-delayed-message",true,false, map);}else{
                    exchange =newDirectExchange(exchangeName);}break;case"1":if(zero.equals(delayType)){//待补充}else{
                    exchange =newTopicExchange(exchangeName);}break;case"2":if(zero.equals(delayType)){//待补充}else{
                    exchange =newFanoutExchange(exchangeName);}break;case"3":if(zero.equals(delayType)){//待补充}else{
                    exchange =newHeadersExchange(exchangeName);}break;default:break;}return exchange;}/**
     * 关闭连接
     */publicvoidclose(){try(Connection connection = connectionFactory.createConnection()){try(Channel channel = connection.createChannel(true)){com.rabbitmq.client.Connection connection1 = channel.getConnection();
                connection1.close();}}catch(Exception e){
            e.printStackTrace();}}}

6. service接口

@Slf4j@Service("mqConfigService")publicclassMqConfigServiceImplextendsServiceImpl<MqConfigDao,MqConfig>implementsMqConfigService{@ResourceprivateMqUtils mqUtils;@Overridepublicbooleanadd(MqConfig mqConfig){//写入数据库boolean save =save(mqConfig);//判断是否写入成功if(save){//判断是否启用消息队列status(mqConfig);}return save;}@Overridepublicbooleanupdate(MqConfig mqConfig){//查询数据库现存信息MqConfig byId =getById(mqConfig.getMqId());//修改数据boolean b =updateById(mqConfig);//判断是否修改成功if(b){//删除消息队列addOrDelMq(byId,1);//判断是否启用消息队列status(mqConfig);}return b;}/**
     * 状态位为开启时,消息队列创建方法封装
     * @param mqConfig
     */privatevoidstatus(MqConfig mqConfig){if("0".equals(mqConfig.getStatus())){//创建消息队列addOrDelMq(mqConfig,0);}}/**
     * 新增队列方法封装
     * @param mqConfig
     */publicvoidaddOrDelMq(MqConfig mqConfig,int type){
        mqUtils.mqOperate(MqVo.builder().exchangeType(mqConfig.getExchangeType()).queueName(mqConfig.getExchangeName()).exchangeName(mqConfig.getExchangeName()).binding(mqConfig.getBinding()).delayType(mqConfig.getDelayType()).type(type).build());}/**
     * 删除消息队列
     * @param mqIds
     * @return
     */@Overridepublicbooleandelete(String[] mqIds){Boolean b =false;for(String mqId : mqIds){//查询消息队列信息MqConfig mqConfig =getById(mqId);//删除消息队列
            b =removeById(mqId);//判断消息队列是否删除成功if(b){//删除消息队列addOrDelMq(mqConfig,1);}}return b;}}

7.controller接口

@RestController@RequestMapping("mqConfig")publicclassMqConfigController{/**
     * 服务对象
     */@ResourceprivateMqConfigService mqConfigService;/**
     * 新增mq配置
     *
     * @param mqConfig 实体
     * @return 新增是否成功
     */@ApiOperation(value ="新增mq配置")@PostMapping(value ="add", produces ="application/json;charset=utf-8")publicApiResult<Boolean>add(MqConfig mqConfig){returnApiResult.ok("添加成功", mqConfigService.add(mqConfig));}/**
     * 修改mq配置
     *
     * @param mqConfig 实体
     * @return 修改是否成功
     */@ApiOperation(value ="修改mq配置")@PutMapping(value ="update", produces ="application/json;charset=utf-8")publicApiResult<Boolean>update(MqConfig mqConfig){returnApiResult.ok("修改成功", mqConfigService.update(mqConfig));}/**
     * 删除mq配置
     *
     * @param mqIds 主键
     * @return 删除是否成功
     */@ApiOperation(value ="删除mq配置")@DeleteMapping(value ="deleteById", produces ="application/json;charset=utf-8")publicApiResult<Boolean>deleteById(String[] mqIds){returnApiResult.ok("删除成功", mqConfigService.delete(mqIds));}}

8.gitee地址


本文转载自: https://blog.csdn.net/weixin_42552016/article/details/129836257
版权归原作者 触不可及,而又念念不忘 所有, 如有侵权,请联系我们删除。

“springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定”的评论:

还没有评论