springboot-rabbitmq 实现动态配置监听容器
1.从源码了解mq启动配置流程
1.1配置启动入口
1.1.1从factories我们可以看到mq的启动配置类
1.1.2然后我们找到
RabbitAutoConfiguration
,发现它引入了
RabbitAnnotationDrivenConfiguration
这个配置类
@Configuration@ConditionalOnClass({RabbitTemplate.class,Channel.class})@EnableConfigurationProperties(RabbitProperties.class)@Import(RabbitAnnotationDrivenConfiguration.class)publicclassRabbitAutoConfiguration{}
1.1.3进入
RabbitAnnotationDrivenConfiguration
滑到最低部看到这里引入了
@EnableRabbit
这个注解,找个注解里面又引出
RabbitBootstrapConfiguration
这个配置类
@EnableRabbit@ConditionalOnMissingBean(name =RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)protectedstaticclassEnableRabbitConfiguration{}//---------------------------------------------@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(RabbitBootstrapConfiguration.class)public@interfaceEnableRabbit{}
1.1.4这里定义了两个bean,其中
RabbitListenerEndpointRegistry
就是监听容器的注册操作实现类
@ConfigurationpublicclassRabbitBootstrapConfiguration{@Bean(name =RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)publicRabbitListenerAnnotationBeanPostProcessorrabbitListenerAnnotationProcessor(){returnnewRabbitListenerAnnotationBeanPostProcessor();}@Bean(name =RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)publicRabbitListenerEndpointRegistrydefaultRabbitListenerEndpointRegistry(){returnnewRabbitListenerEndpointRegistry();}}
1.1.5
RabbitListenerEndpointRegistry
里面有获取所有容器的方法getListenerContainerIds和注册监听容器的方法registerListenerContainer
/**
* Create a message listener container for the given {@link RabbitListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
* @param endpoint the endpoint to add
* @param factory the listener factory to use
* @see #registerListenerContainer(RabbitListenerEndpoint, RabbitListenerContainerFactory, boolean)
*/publicvoidregisterListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory){registerListenerContainer(endpoint, factory,false);}
1.1.6触发监听容器的位置是在
RabbitListenerEndpointRegistrar
类里面的bean初始化完成调用的钩子方法里面,注册所有listenercontain。ps:而RabbitListenerEndpointRegistrar类是
RabbitListenerAnnotationBeanPostProcessor
的属性对象,RabbitListenerAnnotationBeanPostProcessor是在1.1.4哪里初始化的rabbitmq监听注解拓展对象
@OverridepublicvoidafterPropertiesSet(){registerAllEndpoints();}//---------------------------------------publicclassRabbitListenerAnnotationBeanPostProcessorimplementsBeanPostProcessor,Ordered,BeanFactoryAware,BeanClassLoaderAware,EnvironmentAware,SmartInitializingSingleton{/**
* The bean name of the default {@link org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory}.
*/publicstaticfinalString DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME ="rabbitListenerContainerFactory";privatefinalRabbitListenerEndpointRegistrar registrar =newRabbitListenerEndpointRegistrar();}
2.创建自定义消息处理对象和监听容器
2.1在自定义bean引入RabbitListenerEndpointRegistry
@ComponentpublicclassRabbitmqCustomerConfiguration{@ResourceRabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;}
2.2创建MyMessageListener类,实现消息监听接口
@ComponentpublicclassMyMessageListenerimplementsMessageListener{@OverridepublicvoidonMessage(Message message){System.out.println(newString(message.getBody()));}}
2.3新建SimpleRabbitListenerEndpoint,设置需要监听的队列名和自定义消息接收处理器
@ComponentpublicclassRabbitmqCustomerConfiguration{@ResourceRabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;@AutowiredMyMessageListener myMessageListener;publicvoidregistryCustomerContain(){SimpleRabbitListenerEndpoint simpleRabbitListenerEndpoint =newSimpleRabbitListenerEndpoint();
simpleRabbitListenerEndpoint.setMessageListener(myMessageListener);
simpleRabbitListenerEndpoint.setQueueNames("testQueue");//第三个参数是否马上启动监听容器
rabbitListenerEndpointRegistry.registerListenerContainer(simpleRabbitListenerEndpoint,null,true);}}
ps:第二个参数的null为空的时候会调用default的factory。
从RabbitListenerEndpointRegistrar判断工厂是否为空,最后会根据containerFactoryBeanName获取
privateRabbitListenerContainerFactory<?>resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor){if(descriptor.containerFactory !=null)//*return descriptor.containerFactory;}elseif(this.containerFactory !=null){//* return this.containerFactory;}elseif(this.containerFactoryBeanName !=null){Assert.state(this.beanFactory !=null,"BeanFactory must be set to obtain container factory by bean name");this.containerFactory =this.beanFactory.getBean(this.containerFactoryBeanName,RabbitListenerContainerFactory.class);returnthis.containerFactory;// Consider changing this if live change of the factory is required}else{thrownewIllegalStateException("Could not resolve the "+RabbitListenerContainerFactory.class.getSimpleName()+" to use for ["+
descriptor.endpoint +"] no factory was given and no default is set.");}}
而containerFactoryBeanName是来自RabbitListenerAnnotationBeanPostProcessor的
publicstaticfinalString DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME ="rabbitListenerContainerFactory";
也就是1.1.2里面提到的RabbitAnnotationDrivenConfiguration里面定义的bean
@Bean@ConditionalOnMissingBean(name ="rabbitListenerContainerFactory")publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);return factory;}
2.4思路:运行rabbitListenerEndpointRegistry的stop和start重启刷新所有监听器,或者刷新容器会导致刷新
@Overridepublicvoidstart(){for(MessageListenerContainer listenerContainer :getListenerContainers()){startIfNecessary(listenerContainer);}}//----------------privatevoidstartIfNecessary(MessageListenerContainer listenerContainer){if(this.contextRefreshed || listenerContainer.isAutoStartup()){
listenerContainer.start();}}//----------------@OverridepublicvoidonApplicationEvent(ContextRefreshedEvent event){if(event.getApplicationContext().equals(this.applicationContext)){this.contextRefreshed =true;}}
版权归原作者 阔爱喵了个咪 所有, 如有侵权,请联系我们删除。