0


springboot-rabbitmq 实现动态配置监听容器

springboot-rabbitmq 实现动态配置监听容器

1.从源码了解mq启动配置流程

1.1配置启动入口

1.1.1从factories我们可以看到mq的启动配置类
在这里插入图片描述

1.1.2然后我们找到

RabbitAutoConfiguration

,发现它引入了

RabbitAnnotationDrivenConfiguration

这个配置类

@Configuration@ConditionalOnClass({RabbitTemplate.class,Channel.class})@EnableConfigurationProperties(RabbitProperties.class)@Import(RabbitAnnotationDrivenConfiguration.class)publicclassRabbitAutoConfiguration{}

1.1.3进入

RabbitAnnotationDrivenConfiguration

滑到最低部看到这里引入了

@EnableRabbit

这个注解,找个注解里面又引出

RabbitBootstrapConfiguration

这个配置类

@EnableRabbit@ConditionalOnMissingBean(name =RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)protectedstaticclassEnableRabbitConfiguration{}//---------------------------------------------@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(RabbitBootstrapConfiguration.class)public@interfaceEnableRabbit{}

1.1.4这里定义了两个bean,其中

RabbitListenerEndpointRegistry

就是监听容器的注册操作实现类

@ConfigurationpublicclassRabbitBootstrapConfiguration{@Bean(name =RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicRabbitListenerAnnotationBeanPostProcessorrabbitListenerAnnotationProcessor(){returnnewRabbitListenerAnnotationBeanPostProcessor();}@Bean(name =RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)publicRabbitListenerEndpointRegistrydefaultRabbitListenerEndpointRegistry(){returnnewRabbitListenerEndpointRegistry();}}

1.1.5

RabbitListenerEndpointRegistry

里面有获取所有容器的方法getListenerContainerIds和注册监听容器的方法registerListenerContainer

/**
     * Create a message listener container for the given {@link RabbitListenerEndpoint}.
     * <p>This create the necessary infrastructure to honor that endpoint
     * with regards to its configuration.
     * @param endpoint the endpoint to add
     * @param factory the listener factory to use
     * @see #registerListenerContainer(RabbitListenerEndpoint, RabbitListenerContainerFactory, boolean)
     */publicvoidregisterListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory){registerListenerContainer(endpoint, factory,false);}

1.1.6触发监听容器的位置是在

RabbitListenerEndpointRegistrar

类里面的bean初始化完成调用的钩子方法里面,注册所有listenercontain。ps:而RabbitListenerEndpointRegistrar类是

RabbitListenerAnnotationBeanPostProcessor

的属性对象,RabbitListenerAnnotationBeanPostProcessor是在1.1.4哪里初始化的rabbitmq监听注解拓展对象

@OverridepublicvoidafterPropertiesSet(){registerAllEndpoints();}//---------------------------------------publicclassRabbitListenerAnnotationBeanPostProcessorimplementsBeanPostProcessor,Ordered,BeanFactoryAware,BeanClassLoaderAware,EnvironmentAware,SmartInitializingSingleton{/**
     * The bean name of the default {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}.
     */publicstaticfinalString DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME ="rabbitListenerContainerFactory";privatefinalRabbitListenerEndpointRegistrar registrar =newRabbitListenerEndpointRegistrar();}

2.创建自定义消息处理对象和监听容器

2.1在自定义bean引入RabbitListenerEndpointRegistry

@ComponentpublicclassRabbitmqCustomerConfiguration{@ResourceRabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;}

2.2创建MyMessageListener类,实现消息监听接口

@ComponentpublicclassMyMessageListenerimplementsMessageListener{@OverridepublicvoidonMessage(Message message){System.out.println(newString(message.getBody()));}}

2.3新建SimpleRabbitListenerEndpoint,设置需要监听的队列名和自定义消息接收处理器

@ComponentpublicclassRabbitmqCustomerConfiguration{@ResourceRabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;@AutowiredMyMessageListener myMessageListener;publicvoidregistryCustomerContain(){SimpleRabbitListenerEndpoint simpleRabbitListenerEndpoint =newSimpleRabbitListenerEndpoint();
        simpleRabbitListenerEndpoint.setMessageListener(myMessageListener);
        simpleRabbitListenerEndpoint.setQueueNames("testQueue");//第三个参数是否马上启动监听容器        
        rabbitListenerEndpointRegistry.registerListenerContainer(simpleRabbitListenerEndpoint,null,true);}}

ps:第二个参数的null为空的时候会调用default的factory。
从RabbitListenerEndpointRegistrar判断工厂是否为空,最后会根据containerFactoryBeanName获取

privateRabbitListenerContainerFactory<?>resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor){if(descriptor.containerFactory !=null)//*return descriptor.containerFactory;}elseif(this.containerFactory !=null){//*    return this.containerFactory;}elseif(this.containerFactoryBeanName !=null){Assert.state(this.beanFactory !=null,"BeanFactory must be set to obtain container factory by bean name");this.containerFactory =this.beanFactory.getBean(this.containerFactoryBeanName,RabbitListenerContainerFactory.class);returnthis.containerFactory;// Consider changing this if live change of the factory is required}else{thrownewIllegalStateException("Could not resolve the "+RabbitListenerContainerFactory.class.getSimpleName()+" to use for ["+
                    descriptor.endpoint +"] no factory was given and no default is set.");}}

而containerFactoryBeanName是来自RabbitListenerAnnotationBeanPostProcessor的

publicstaticfinalString DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME ="rabbitListenerContainerFactory";

也就是1.1.2里面提到的RabbitAnnotationDrivenConfiguration里面定义的bean

@Bean@ConditionalOnMissingBean(name ="rabbitListenerContainerFactory")publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);return factory;}

2.4思路:运行rabbitListenerEndpointRegistry的stop和start重启刷新所有监听器,或者刷新容器会导致刷新

@Overridepublicvoidstart(){for(MessageListenerContainer listenerContainer :getListenerContainers()){startIfNecessary(listenerContainer);}}//----------------privatevoidstartIfNecessary(MessageListenerContainer listenerContainer){if(this.contextRefreshed || listenerContainer.isAutoStartup()){
            listenerContainer.start();}}//----------------@OverridepublicvoidonApplicationEvent(ContextRefreshedEvent event){if(event.getApplicationContext().equals(this.applicationContext)){this.contextRefreshed =true;}}

本文转载自: https://blog.csdn.net/m0_46978151/article/details/124875003
版权归原作者 阔爱喵了个咪 所有, 如有侵权,请联系我们删除。

“springboot-rabbitmq 实现动态配置监听容器”的评论:

还没有评论