为什么需要动态创建队列与绑定交换机?我在写项目的时候遇到这么个问题,我数据库中存在一个字段messageType指定为消息类型,消息类型存在三种,一种是通知类,一种是验证码类,一种是活动类。并且对应的,要将消息进行不同渠道的分发,还存在一个channelType,而他又存在QQ邮箱,手机短信、服务号三种不同的渠道。假如说我每增加一个渠道类型,我就必须再一次手动创建一个队列,那可太烦人了,并且还得新增一个新的监听器。而对应的,每个渠道商给的接口调用频率不同,如果我采用一个队列发送所有的渠道消息,这样一来,一旦某个服务被限流阻塞,那我核心服务就使用不了了。因此还需要对不同的渠道创建不同的队列进行消息隔离。这又面临一个问题,我怎么去动态的创建我自己的队列,不需要我每次都去手动新增队列、绑定队列?下面我说说自己的解决方案。当然,RabbitMQ的交换机的类型不同,对应的队列绑定方式也不太一样.
先复习复习,这些声明创建绑定方式
RabbitMQ中,声明队列以及绑定队列的几种方式
使用RabbitManager可视化图形界面创建、绑定队列
在queues中创建好队列
在exchanges中创建好交换机
在queues中,找到自己创建的队列,点进去,绑定上路由和交换机
或者
在exchanges中,找到自己创建的交换机,点进去,绑定上路由和交换机
反正无论你从哪进去,只要能将queue绑到交换机上,那就没问题。注意一些参数的选择。如果不理解可以看看我之前的文章。
写一个Config类,在Config类中声明队列绑定交换机
示例如下
/**
* 将队列与交换机绑定
*/@ConfigurationpublicclassRabbitmqConfig{//声明sms交换机 @Bean(RabbitmqConstant.SMS_EXCHANGE_NAME)publicExchangeSMS_EXCHANGE(){//durable(true) 持久化,mq重启之后交换机还在 returnExchangeBuilder.topicExchange(RabbitmqConstant.SMS_EXCHANGE_NAME).durable(true).build();}//声明sms队列 @Bean(RabbitmqConstant.SMS_QUEUE_NAME)publicQueueSMS_QUEUE(){returnnewQueue(RabbitmqConstant.SMS_QUEUE_NAME);}//sms队列绑定sms交换机,指定routingKey @BeanpublicBindingBINDING_ROUTING_KEY_SMS(@Qualifier(RabbitmqConstant.SMS_QUEUE_NAME)Queue queue,@Qualifier(RabbitmqConstant.SMS_EXCHANGE_NAME)Exchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(RabbitmqConstant.SMS_ROUTING_KET_FOR_REGISTER).noargs();}}
直接在@RabbitListener中声明并且绑定队列
@RabbitListener(bindings =@QueueBinding(
value =@Queue(value ="${spring.rabbitmq.queues}", durable ="true"),
exchange =@Exchange(value ="${rabbitmq.exchange.name}", type =ExchangeTypes.TOPIC),
key ="${rabbitmq.routing.key}"))publicvoidonMessage(Message message){//收到消息后处理...}
RabbitAdmin的使用
RabbitAdmin 是 Spring AMQP 框架中的一个重要组件,用于简化和管理 RabbitMQ 的配置和操作。它提供了一种方便的方式来声明队列、交换机和绑定等 RabbitMQ 的对象,并使用这些对象进行消息的发送和接收。
RabbitAdmin 可以自动创建和绑定队列、交换机和绑定,这样可以在应用程序启动时自动配置 RabbitMQ 的相关对象,而不需要手动去创建和配置这些对象。
使用 RabbitAdmin,你可以通过编程的方式在应用程序中进行 RabbitMQ 的配置和操作,而不需要手动通过管理界面或命令行工具去创建和管理 RabbitMQ 的对象。
以下是 RabbitAdmin 的一些常见用法:
- 声明队列:使用
declareQueue()
方法来声明队列,可以设置队列的名称、是否持久化、是否独占等属性。 - 声明交换机:使用
declareExchange()
方法来声明交换机,可以设置交换机的类型、是否持久化等属性。 - 声明绑定:使用
declareBinding()
方法来声明绑定,将交换机和队列通过指定的路由键绑定在一起。 - 发送消息:使用
convertAndSend()
方法来向指定的交换机发送消息。 - 接收消息:使用
receiveAndConvert()
方法来接收消息,并将消息转换为指定的 Java 对象。
一个问题
我的一个项目,对不同的业务的发送队列不一样。按照前文所说,我需要手写9个@RabbitListener,我真的会吐。下面看看我是怎么解决的。
Spring SpEL
Spring SpEL(Spring Expression Language,Spring 表达式语言)是 Spring 框架中提供的一种强大的表达式语言。它在功能上比 EL(Expression Language)更为强大,并且可以在更多的场景中使用。
Spring SpEL 可以在 Spring 应用程序的配置文件中使用,用于定义和处理字符串模板、计算表达式和访问对象属性等。 SpEL 提供了一套丰富的语法和功能,使得在 Spring 配置文件或 Spring Boot 的注解中可以进行更灵活和动态的编程。如果兴趣可以去看看这篇文章的详解使用。
深入掌握Spring SpEL及其实战应用
我这里就说一些我会用的
。SpEl可以使用在注解上,可以动态的写一个表达式,并且解析。表达式就类似于#{1 + 1 }、
#{beanName.get()}方法调用,或者是读取配置文件#{‘${rabbimq.name}’}。表达式可以调用SpringIOC容器
中的Bean的方法,下面我就用到了。
RabbitAdmin 配合 SpringEl表达式 和 @RabbitListener 以及Spring多例Bean,动态创建队列并且声明队列监听Bean
第一步,配合SpringBean的生命周期中,@PostConsturct注解,这个注解会在Bean属性赋值之后调用。因此,我拿他来用RabbitAdmin初始化我的队列
工具类,负责从数据库中获取队列的一些拼接段,并且将其拼接好,放在集合中。
@Component("queueNameMappingUtils")publicclassQueueNameMappingUtils{/**
* 下标(用于迭代groupIds位置)
*/privatestaticInteger index =0;privatestaticList<String> queueNames =getAllQueueNames();/**
* 获取所有队列名称
*/publicstaticList<String>getAllQueueNames(){List<String> queueNames =newArrayList<>();ChannelType[] channelTypes =ChannelType.values();MessageType[] messageTypes =MessageType.values();//将他们组合起来 for(ChannelType channelType : channelTypes){for(MessageType messageType : messageTypes){
groupIds.add(channelType.getCodeEn()+"."+ messageType.getCodeEn());}}return queueNames;}/**
* 一个一个获取
*/publicStringget(){return queueNames.get(index++);}}
@Service@Slf4jpublicclassRabbitMqReceiverInit{/**
* 上下文容器,用来获取实例
*/@AutowiredprivateApplicationContext applicationContext;/**
* 编程的方式绑定队列
*/@AutowiredprivateRabbitAdmin rabbitAdmin;@Value("${message.push.rabbitmq.exchange.name}")privateString exchangeName;@Value("${message.pusg.rabbitmq.queue.name.prefix}")privateString queuePrefix;@Value("${message.push.rabbitmq.routing.key.prefix}")privateString routingKeyPrefix;/**
* 获取得到所有的groupId
*/privatestaticList<String> queueNames =QueueNameMappingUtils.getAllQueueNames();/**
* 创建出对象,这个是在bean属性注入之后搞定的
*/@PostConstructpublicvoidinit(){for(int i =0; i < queueNames.size(); i++){
applicationContext.getBean(RabbitMqReceiver.class);}//申明交换机
rabbitAdmin.declareExchange(newTopicExchange(exchangeName,true,false));//开始生产队列 for(String s : queueNames){//队列名称 String queueName = queuePrefix +"."+ s;//路由名称 String routingKey = routingKeyPrefix +"."+ s;//申明队列
rabbitAdmin.declareQueue(newQueue(queueName,true));//绑定上交换机
rabbitAdmin.declareBinding(newBinding(queueName,Binding.DestinationType.QUEUE, exchangeName, routingKey,null));}}@BeanpublicRabbitAdminrabbitAdmin(ConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}}
第二,利用Bean的方法,以及下标偏移,不断获取下一个组装好的队列名称,并且通过SpringSpEl不断访问,最终成功创建多个监听器,并且监听的是不同的队列
@Component@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)publicclassRabbitMqReceiver{@RabbitListener(queues ="#{'${message.push.rabbitmq.queue.name.prefix}'+'.'+ queueNameMappingUtils.get()}")publicvoidonMessage(Message message){//处理消息...}}
版权归原作者 啊vvvv 所有, 如有侵权,请联系我们删除。