0


Springboot RabbitMq源码解析之RabbitListener注解 (四)

文章目录

1.RabbitListener注解介绍

RabbitListener

Springboot RabbitMq

中经常用到的一个注解,将被

RabbitListener

注解的类和方法封装成

MessageListener

注入

MessageListenerContainer
  • 当RabbitListener注解在方法上时,对应的方式就是Rabbit消息的监听器
  • 当RabbitListener注解在类上时,和RabbitHandle注解配合使用,可以实现不同类型的消息的分发,类中被RabbitHandle注解的方法就是Rabbit消息的监听器

2.EnableRabbit和RabbitBootstrapConfiguration

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

通过自动配置类RabbitAutoConfiguration将EnableRabbit引入,而EnableRabbit又通过import注解引入了配置类RabbitBootstrapConfiguration

publicclassRabbitBootstrapConfigurationimplementsImportBeanDefinitionRegistrar{@OverridepublicvoidregisterBeanDefinitions(@NullableAnnotationMetadata importingClassMetadata,BeanDefinitionRegistry registry){if(!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)){

            registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,newRootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor.class));}if(!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)){
            registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,newRootBeanDefinition(RabbitListenerEndpointRegistry.class));}}}

容器Ioc中注入RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry

3.RabbitListenerAnnotationBeanPostProcessor

在这里插入图片描述

RabbitListenerAnnotationBeanPostProcessor

类实现了

BeanPostProcessor

,

Ordered

,

BeanFactoryAware

,

BeanClassLoaderAware

,

EnvironmentAware

,

SmartInitializingSingleton

接口,Ordered表示处理顺序,BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware主要用于获取对应的BeanFactory,BeanClassLoader, Environment属性,我们主要关注从

SmartInitializingSingleton

BeanPostProcessor

继承的方法

在这里插入图片描述

publicvoidafterSingletonsInstantiated(){this.registrar.setBeanFactory(this.beanFactory);if(this.beanFactory instanceofListableBeanFactory){Map<String,RabbitListenerConfigurer> instances =((ListableBeanFactory)this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);for(RabbitListenerConfigurer configurer : instances.values()){
            configurer.configureRabbitListeners(this.registrar);}}if(this.registrar.getEndpointRegistry()==null){if(this.endpointRegistry ==null){Assert.state(this.beanFactory !=null,"BeanFactory must be set to find endpoint registry by bean name");this.endpointRegistry =this.beanFactory.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,RabbitListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}if(this.containerFactoryBeanName !=null){this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurerMessageHandlerMethodFactory handlerMethodFactory =this.registrar.getMessageHandlerMethodFactory();if(handlerMethodFactory !=null){this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);}// Actually register all listenersthis.registrar.afterPropertiesSet();// clear the cache - prototype beans will be re-cached.this.typeCache.clear();}

初始化工作,主要是基于自定义配置RabbitListenerConfigurer进行RabbitListenerAnnotationBeanPostProcessor(尤其是registrar元素)的初始化

在这里插入图片描述

  • postProcessBeforeInitialization
  • postProcessAfterInitialization

在这里插入图片描述

在这里插入图片描述

@OverridepublicObjectpostProcessAfterInitialization(finalObject bean,finalString beanName)throwsBeansException{Class<?> targetClass =AopUtils.getTargetClass(bean);finalTypeMetadata metadata =this.typeCache.computeIfAbsent(targetClass,this::buildMetadata);for(ListenerMethod lm : metadata.listenerMethods){for(RabbitListener rabbitListener : lm.annotations){processAmqpListener(rabbitListener, lm.method, bean, beanName);}}if(metadata.handlerMethods.length >0){processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);}return bean;}

对RabbitListener注解查找和解析

  • RabbitListenerAnnotationBeanPostProcessor#buildMetadata
  • RabbitListenerAnnotationBeanPostProcessor#processAmqpListener
  • RabbitListenerAnnotationBeanPostProcessor#processMultiMethodListeners

4.对RabbitListener注解的解析

RabbitListenerAnnotationBeanPostProcessor#buildMetadata

privateTypeMetadatabuildMetadata(Class<?> targetClass){Collection<RabbitListener> classLevelListeners =findListenerAnnotations(targetClass);finalboolean hasClassLevelListeners = classLevelListeners.size()>0;finalList<ListenerMethod> methods =newArrayList<>();finalList<Method> multiMethods =newArrayList<>();ReflectionUtils.doWithMethods(targetClass, method ->{Collection<RabbitListener> listenerAnnotations =findListenerAnnotations(method);if(listenerAnnotations.size()>0){
            methods.add(newListenerMethod(method,
                    listenerAnnotations.toArray(newRabbitListener[listenerAnnotations.size()])));}if(hasClassLevelListeners){RabbitHandler rabbitHandler =AnnotationUtils.findAnnotation(method,RabbitHandler.class);if(rabbitHandler !=null){
                multiMethods.add(method);}}},ReflectionUtils.USER_DECLARED_METHODS);if(methods.isEmpty()&& multiMethods.isEmpty()){returnTypeMetadata.EMPTY;}returnnewTypeMetadata(
            methods.toArray(newListenerMethod[methods.size()]),
            multiMethods.toArray(newMethod[multiMethods.size()]),
            classLevelListeners.toArray(newRabbitListener[classLevelListeners.size()]));}

RabbitListenerAnnotationBeanPostProcessor就是针对每一个bean类进行解析,针对类上的

RabbitListener

注解、方法上的

RabbitHandle

注解和方法上的

RabbitListener

注解解析后封装到

TypeMetadata

类中

通过RabbitListenerAnotationBeanPostProcessor#buildMetadata查找并封装成TypeMetadata分别交给

processAmqpListener

processMultiMethodListeners

进行解析

protectedvoidprocessAmqpListener(RabbitListener rabbitListener,Method method,Object bean,String beanName){Method methodToUse =checkProxy(method, bean);MethodRabbitListenerEndpoint endpoint =newMethodRabbitListenerEndpoint();
    endpoint.setMethod(methodToUse);
    endpoint.setBeanFactory(this.beanFactory);
    endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));String errorHandlerBeanName =resolveExpressionAsString(rabbitListener.errorHandler(),"errorHandler");if(StringUtils.hasText(errorHandlerBeanName)){
        endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName,RabbitListenerErrorHandler.class));}processListener(endpoint, rabbitListener, bean, methodToUse, beanName);}privatevoidprocessMultiMethodListeners(RabbitListener[] classLevelListeners,Method[] multiMethods,Object bean,String beanName){List<Method> checkedMethods =newArrayList<Method>();for(Method method : multiMethods){
        checkedMethods.add(checkProxy(method, bean));}for(RabbitListener classLevelListener : classLevelListeners){MultiMethodRabbitListenerEndpoint endpoint =newMultiMethodRabbitListenerEndpoint(checkedMethods, bean);
        endpoint.setBeanFactory(this.beanFactory);processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);}}

RabbitListenerAnnotationBeanPostProcessor#processAmqpListener针对被RabbitListener注解的方法进行解析,
RabbitListenerAnnotationBeanPostProcessot#processMultiMethodListeners针对RabbitListener注解的类中被RabbitHandle注解的方法进行解析

新建

MultiMethodRabbitListenerEndpoint

对象,针对两种方式的差异进行部分属性的初始化后交给RabbitListenerAnnotationBeanPostProcessor进行后续处理

processListener
protectedvoidprocessListener(MethodRabbitListenerEndpoint endpoint,RabbitListener rabbitListener,Object bean,Object adminTarget,String beanName){
    endpoint.setBean(bean);
    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    endpoint.setId(getEndpointId(rabbitListener));
    endpoint.setQueueNames(resolveQueues(rabbitListener));
    endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(),"concurrency"));String group = rabbitListener.group();if(StringUtils.hasText(group)){Object resolvedGroup =resolveExpression(group);if(resolvedGroup instanceofString){
            endpoint.setGroup((String) resolvedGroup);}}String autoStartup = rabbitListener.autoStartup();if(StringUtils.hasText(autoStartup)){
        endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));}

    endpoint.setExclusive(rabbitListener.exclusive());String priority =resolve(rabbitListener.priority());if(StringUtils.hasText(priority)){try{
            endpoint.setPriority(Integer.valueOf(priority));}catch(NumberFormatException ex){thrownewBeanInitializationException("Invalid priority value for "+
                    rabbitListener +" (must be an integer)", ex);}}String rabbitAdmin =resolve(rabbitListener.admin());if(StringUtils.hasText(rabbitAdmin)){Assert.state(this.beanFactory !=null,"BeanFactory must be set to resolve RabbitAdmin by bean name");try{
            endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin,RabbitAdmin.class));}catch(NoSuchBeanDefinitionException ex){thrownewBeanInitializationException("Could not register rabbit listener endpoint on ["+
                    adminTarget +"], no "+RabbitAdmin.class.getSimpleName()+" with id '"+
                    rabbitAdmin +"' was found in the application context", ex);}}RabbitListenerContainerFactory<?> factory =null;String containerFactoryBeanName =resolve(rabbitListener.containerFactory());if(StringUtils.hasText(containerFactoryBeanName)){Assert.state(this.beanFactory !=null,"BeanFactory must be set to obtain container factory by bean name");try{
            factory =this.beanFactory.getBean(containerFactoryBeanName,RabbitListenerContainerFactory.class);}catch(NoSuchBeanDefinitionException ex){thrownewBeanInitializationException("Could not register rabbit listener endpoint on ["+
                    adminTarget +"] for bean "+ beanName +", no "+RabbitListenerContainerFactory.class.getSimpleName()+" with id '"+
                    containerFactoryBeanName +"' was found in the application context", ex);}}this.registrar.registerEndpoint(endpoint, factory);}

根据RabbitListener注解的属性进行

MethodRabbitListenerEndpoint

的属性设置和校验,最后通过

RabbitListenerEndpointRegistrar#registerEndpoint

方法将

MethodRabbitListenerEndpoint

注入容器

RabbitListenerContainerFactory

5.RabbitListenerEndpointRegistrar

在这里插入图片描述

@OverridepublicvoidafterPropertiesSet(){registerAllEndpoints();}protectedvoidregisterAllEndpoints(){synchronized(this.endpointDescriptors){for(AmqpListenerEndpointDescriptor descriptor :this.endpointDescriptors){this.endpointRegistry.registerListenerContainer(
                    descriptor.endpoint,resolveContainerFactory(descriptor));}this.startImmediately =true;// trigger immediate startup}}

RabbitListenerEndpointRegistrar#registerEndpoint

publicvoidregisterEndpoint(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory){Assert.notNull(endpoint,"Endpoint must be set");Assert.hasText(endpoint.getId(),"Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the containerAmqpListenerEndpointDescriptor descriptor =newAmqpListenerEndpointDescriptor(endpoint, factory);synchronized(this.endpointDescriptors){if(this.startImmediately){// Register and start immediatelythis.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor),true);}else{this.endpointDescriptors.add(descriptor);}}}

RabbitListenerEndpointRegistry#registerListenerContainer进行注册监听器的容器

RabbitListenerEndpointRegistry#registerListenerContainer

publicvoidregisterListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory,boolean startImmediately){Assert.notNull(endpoint,"Endpoint must not be null");Assert.notNull(factory,"Factory must not be null");String id = endpoint.getId();Assert.hasText(id,"Endpoint id must not be empty");synchronized(this.listenerContainers){Assert.state(!this.listenerContainers.containsKey(id),"Another endpoint is already registered with id '"+ id +"'");MessageListenerContainer container =createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if(StringUtils.hasText(endpoint.getGroup())&&this.applicationContext !=null){List<MessageListenerContainer> containerGroup;if(this.applicationContext.containsBean(endpoint.getGroup())){
                containerGroup =this.applicationContext.getBean(endpoint.getGroup(),List.class);}else{
                containerGroup =newArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}
            containerGroup.add(container);}if(startImmediately){startIfNecessary(container);}}}

基于RabbitListenerEndpoint根据监听器的容器工厂类生成一个

监听器的容器

,并且整个注册过程是同步的,同时最多只能有一个endpoint在注册

RabbitListenerEndpointRegistry#start

@Overridepublicvoidstart(){for(MessageListenerContainer listenerContainer :getListenerContainers()){startIfNecessary(listenerContainer);}}privatevoidstartIfNecessary(MessageListenerContainer listenerContainer){if(this.contextRefreshed || listenerContainer.isAutoStartup()){
        listenerContainer.start();}}

调用MessageListenerContainer#start方法, 监听器的启动。


本文转载自: https://blog.csdn.net/qq_43141726/article/details/128165679
版权归原作者 959y 所有, 如有侵权,请联系我们删除。

“Springboot RabbitMq源码解析之RabbitListener注解 (四)”的评论:

还没有评论