springboot实现rabbitmq动态创建交换机,队列以及交换机、队列绑定
1. 数据库准备
droptableifexists mq_config;/*==============================================================*//* Table: mq_config *//*==============================================================*/createtable mq_config
(
mq_id varchar(200)notnullcomment'交换机id',
exchange_type char(1)comment'交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)',
exchange_name varchar(200)comment'交换机名称',
queue_name varchar(200)comment'队列名称',
binding varchar(200)comment'绑定关系',
delay_type char(1)comment'是否死信队列(0:是;1:否)',
version bigint(20)default0comment'乐观锁',
del_flag char(1)default'0'comment'删除标志(0:存在; 1:删除)',statuschar(1)default'0'comment'记录状态(0:在用; 1:停用)',
create_by varchar(64)comment'创建人',
create_time timestamp(0)defaultCURRENT_TIMESTAMPcomment'创建时间',
update_by varchar(64)comment'修改人',
update_time datetime(0)defaultCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMPcomment'修改时间',
remark varchar(500)comment'备注',primarykey(mq_id));altertable mq_config comment'mq配置表';
2. 依赖引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><!-- 连接池 --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.10</version></dependency><!-- swagger依赖 --><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-micro-spring-boot-starter</artifactId><version>3.0.3</version></dependency><dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>3.0.3</version></dependency><!-- mybatis-plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3. yml配置
# 应用名称server:port:8080spring:datasource:# 配置druid数据库连接池druid:#配置当前数据源类型type: com.alibaba.druid.pool.DruidDataSource
# 配置MySQL的驱动程序类driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/schedule?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8username: root
password: root
# 配置监控统计拦截的filters,stat是sql监控,wall是防火墙(如果不添加则监控无效),添加log4j需要引入jar包filters: stat,wall
# 连接池最大活跃连接数max-active:100# 连接池初始化连接数量initial-size:1# 配置获取连接等待超时的时间max-wait:60000# 连接池最小空闲数min-idle:1# 指定空闲连接检查、废弃连接清理、空闲连接池大小调整之间的操作时间间隔time-between-eviction-runs-millis:60000# 指定一个空闲连接最少空闲多久后可被清除min-evictable-idle-time-millis:300000# 连接是否有效的查询语句validation-query: select 'x'
test-while-idle:truetest-on-borrow:falsetest-on-return:false#打开 PSCache,并且指定每个连接上 PSCache 的大小pool-prepared-statements:truemax-open-prepared-statements:50max-pool-prepared-statement-per-connection-size:20# 配置 DruidStatFilterweb-stat-filter:enabled:true#\u662F\u5426\u542F\u7528StatFilter\u9ED8\u8BA4\u503Ctrue# 排除一些不必要的url,比如.js,/jslib/等exclusions:"*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"# 过滤规则url-pattern: /*
# 配置 DruidStatViewServletstat-view-servlet:#手动重置监控数据enabled:trueurl-pattern: /druid/*
# IP白名单,没有配置或者为空,则允许所有访问allow:#IP黑名单,若白名单也存在,则优先使用deny:# 配置druid登录用户名、密码login-username: admin
login-password: admin
# HTML 中 Reset All 按钮reset-enable:truerabbitmq:host: 10.168.1.200
port:5672virtual-host: test
username: admin
password: admin
mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
4. 创建mq操作类
@Data@Builder@NoArgsConstructor@AllArgsConstructorpublicclassMqVoimplementsSerializable{privatestaticfinallong serialVersionUID =-3630888028848412302L;/**
* 交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)
*/@ApiModelProperty(value ="交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)")privateString exchangeType;/**
* 交换机名称
*/@ApiModelProperty(value ="交换机名称")privateString exchangeName;/**
* 队列名称
*/@ApiModelProperty(value ="队列名称")privateString queueName;/**
* 绑定关系
*/@ApiModelProperty(value ="绑定关系")privateString binding;/**
* 是否死信队列(0:是;1:否)
*/@ApiModelProperty(value ="是否死信队列(0:是;1:否)")privateString delayType;/**
* 操作类型(0:新增; 1:删除)
*/@ApiModelProperty(value ="操作类型(0:新增; 1:删除)")privateint type;}
5. mq操作方法
@Slf4j@ComponentpublicclassMqUtils{/**
* 获取工厂配置类
*/@ResourceprivateConnectionFactory connectionFactory;/**
* 新增消息对列
*/publicvoidmqOperate(MqVo mqVo){//交换机类型String exchangeType = mqVo.getExchangeType();
log.info("exchangeType -> {}", exchangeType);//队列名称String queueName = mqVo.getQueueName();
log.info("queueName -> {}", queueName);//交换机名称String exchangeName = mqVo.getExchangeName();
log.info("exchangeName -> {}", exchangeName);//绑定关系String binding = mqVo.getBinding();
log.info("binding -> {}", binding);//是否死信队列(0:是;1:否)String delayType = mqVo.getDelayType();
log.info("delayType -> {}", delayType);//操作类型int status = mqVo.getType();RabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);if(status ==0){//新增队列
rabbitAdmin.declareQueue(newQueue(queueName));//新增交换机
rabbitAdmin.declareExchange(getExchange(exchangeType, exchangeName, delayType));//新增绑定关系
rabbitAdmin.declareBinding(newBinding(queueName,Binding.DestinationType.QUEUE, exchangeName, binding,null));}else{//删除队列
rabbitAdmin.deleteQueue(queueName);//删除交换机
rabbitAdmin.deleteExchange(exchangeName);//删除绑定关系
rabbitAdmin.removeBinding(newBinding(queueName,Binding.DestinationType.QUEUE, exchangeName, binding,null));}close();}/**
* 交换机生成方法
* @param exchangeType 交换机类型
* @param exchangeName 交换机名称
* @param delayType 是否死信队列(0:是;1:否)
* @return Exchange
*/privateExchangegetExchange(String exchangeType,String exchangeName,String delayType){Exchange exchange =newDirectExchange(exchangeName);String zero ="0";switch(exchangeType){case"0":if(zero.equals(delayType)){Map<String,Object> map =newHashMap<>(1);
map.put("x-delayed-type","direct");
exchange =newCustomExchange(exchangeName,"x-delayed-message",true,false, map);}else{
exchange =newDirectExchange(exchangeName);}break;case"1":if(zero.equals(delayType)){//待补充}else{
exchange =newTopicExchange(exchangeName);}break;case"2":if(zero.equals(delayType)){//待补充}else{
exchange =newFanoutExchange(exchangeName);}break;case"3":if(zero.equals(delayType)){//待补充}else{
exchange =newHeadersExchange(exchangeName);}break;default:break;}return exchange;}/**
* 关闭连接
*/publicvoidclose(){try(Connection connection = connectionFactory.createConnection()){try(Channel channel = connection.createChannel(true)){com.rabbitmq.client.Connection connection1 = channel.getConnection();
connection1.close();}}catch(Exception e){
e.printStackTrace();}}}
6. service接口
@Slf4j@Service("mqConfigService")publicclassMqConfigServiceImplextendsServiceImpl<MqConfigDao,MqConfig>implementsMqConfigService{@ResourceprivateMqUtils mqUtils;@Overridepublicbooleanadd(MqConfig mqConfig){//写入数据库boolean save =save(mqConfig);//判断是否写入成功if(save){//判断是否启用消息队列status(mqConfig);}return save;}@Overridepublicbooleanupdate(MqConfig mqConfig){//查询数据库现存信息MqConfig byId =getById(mqConfig.getMqId());//修改数据boolean b =updateById(mqConfig);//判断是否修改成功if(b){//删除消息队列addOrDelMq(byId,1);//判断是否启用消息队列status(mqConfig);}return b;}/**
* 状态位为开启时,消息队列创建方法封装
* @param mqConfig
*/privatevoidstatus(MqConfig mqConfig){if("0".equals(mqConfig.getStatus())){//创建消息队列addOrDelMq(mqConfig,0);}}/**
* 新增队列方法封装
* @param mqConfig
*/publicvoidaddOrDelMq(MqConfig mqConfig,int type){
mqUtils.mqOperate(MqVo.builder().exchangeType(mqConfig.getExchangeType()).queueName(mqConfig.getExchangeName()).exchangeName(mqConfig.getExchangeName()).binding(mqConfig.getBinding()).delayType(mqConfig.getDelayType()).type(type).build());}/**
* 删除消息队列
* @param mqIds
* @return
*/@Overridepublicbooleandelete(String[] mqIds){Boolean b =false;for(String mqId : mqIds){//查询消息队列信息MqConfig mqConfig =getById(mqId);//删除消息队列
b =removeById(mqId);//判断消息队列是否删除成功if(b){//删除消息队列addOrDelMq(mqConfig,1);}}return b;}}
7.controller接口
@RestController@RequestMapping("mqConfig")publicclassMqConfigController{/**
* 服务对象
*/@ResourceprivateMqConfigService mqConfigService;/**
* 新增mq配置
*
* @param mqConfig 实体
* @return 新增是否成功
*/@ApiOperation(value ="新增mq配置")@PostMapping(value ="add", produces ="application/json;charset=utf-8")publicApiResult<Boolean>add(MqConfig mqConfig){returnApiResult.ok("添加成功", mqConfigService.add(mqConfig));}/**
* 修改mq配置
*
* @param mqConfig 实体
* @return 修改是否成功
*/@ApiOperation(value ="修改mq配置")@PutMapping(value ="update", produces ="application/json;charset=utf-8")publicApiResult<Boolean>update(MqConfig mqConfig){returnApiResult.ok("修改成功", mqConfigService.update(mqConfig));}/**
* 删除mq配置
*
* @param mqIds 主键
* @return 删除是否成功
*/@ApiOperation(value ="删除mq配置")@DeleteMapping(value ="deleteById", produces ="application/json;charset=utf-8")publicApiResult<Boolean>deleteById(String[] mqIds){returnApiResult.ok("删除成功", mqConfigService.delete(mqIds));}}
8.gitee地址
本文转载自: https://blog.csdn.net/weixin_42552016/article/details/129836257
版权归原作者 触不可及,而又念念不忘 所有, 如有侵权,请联系我们删除。
版权归原作者 触不可及,而又念念不忘 所有, 如有侵权,请联系我们删除。