0


Spring初始化顺序- RabbitMq 无法自动创建队列

Spring初始化顺序- RabbitMq 无法自动创建队列

项目中使用了RabbitMq, 并配置了自动创建topic, exchange,binding 等,但是通过测试发现,有一个队列始终无法自动创建,在对spring 源码以及rabbitmq 源码debug 后发现问题。

rabbitmq 配置了两套环境 , 以下为代码示例

@ConfigurationpublicclassRabbitMqConfiguration{/**
     * one mq配置
     */@Bean(name ="oneRabbitMQProperties")@ConfigurationProperties(prefix ="spring.rabbitmq.one")publicRabbitMQPropertiesoneRabbitMQProperties(){returnnewRabbitMQProperties();}@Primary@Bean(name ="oneRabbitConnectionFactory")publicConnectionFactoryoneRabbitConnectionFactory(@Qualifier("oneRabbitMQProperties")RabbitMQProperties rabbitMQProperties){returnconnectionFactory(rabbitMQProperties,false);}@Primary@Bean("oneRabbitMqAdmin")publicRabbitAdminoneRabbitMqAdmin(@Qualifier("oneRabbitConnectionFactory")ConnectionFactory oneRabbitConnectionFactory){returnnewRabbitAdmin(oneRabbitConnectionFactory);}@Primary@Bean(name ="oneRabbitTemplate")publicRabbitTemplateoneRabbitTemplate(@Qualifier("oneRabbitConnectionFactory")ConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}/**
     * two mq配置
     */@Bean(name ="twoRabbitMQProperties")@ConfigurationProperties(prefix ="spring.rabbitmq.two")publicRabbitMQPropertiestwoRabbitMQProperties(){returnnewRabbitMQProperties();}@Bean(name ="twoRabbitConnectionFactory")publicConnectionFactorytwoRabbitConnectionFactory(@Qualifier("twoRabbitMQProperties")RabbitMQProperties rabbitMQProperties){returnconnectionFactory(rabbitMQProperties,false);}@Bean("twoRabbitMqAdmin")publicRabbitAdmintwoRabbitMqAdmin(@Qualifier("twoRabbitConnectionFactory")ConnectionFactory twoRabbitConnectionFactory){returnnewRabbitAdmin(twoRabbitConnectionFactory);}@Bean(name ="twoRabbitTemplate")publicRabbitTemplatetwoRabbitTemplate(@Qualifier("twoRabbitConnectionFactory")ConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}privateConnectionFactoryconnectionFactory(RabbitMQProperties rabbitMQProperties,boolean transaction){CachingConnectionFactory factory =newCachingConnectionFactory(rabbitMQProperties.getHost(), rabbitMQProperties.getPort());
        factory.setUsername(rabbitMQProperties.getUsername());
        factory.setPassword(rabbitMQProperties.getPassword());
        factory.setPublisherConfirms(!transaction);
        factory.setPublisherReturns(true);
        factory.setVirtualHost(rabbitMQProperties.getVirtualHost());return factory;}

Queue, Exchange, Binding 自动生成配置:

@ConfigurationpublicclassRabbitMqQueueConfiguration{@BeanpublicQueueoneQueue(@Qualifier("oneRabbitMqAdmin")RabbitAdmin oneRabbitMqAdmin){Queue queue =newQueue("one_auto_test");
        queue.setAdminsThatShouldDeclare(oneRabbitMqAdmin);return queue;}@BeanpublicDirectExchangeoneExchange(@Qualifier("oneRabbitMqAdmin")RabbitAdmin oneRabbitMqAdmin){DirectExchange exchange =newDirectExchange("one_auto_test");
        exchange.setAdminsThatShouldDeclare(oneRabbitMqAdmin);return exchange;}@BeanpublicBindingoneBinding(Queue oneQueue,DirectExchange oneExchange,@Qualifier("oneRabbitMqAdmin")RabbitAdmin oneRabbitMqAdmin){Binding binding =BindingBuilder.bind(oneQueue).to(oneExchange).with("one_auto_test");
        binding.setAdminsThatShouldDeclare(oneRabbitMqAdmin);return binding;}@BeanpublicQueuetwoQueue(@Qualifier("twoRabbitMqAdmin")RabbitAdmin oneRabbitMqAdmin){Queue queue =newQueue("two_auto_test");
        queue.setAdminsThatShouldDeclare(oneRabbitMqAdmin);return queue;}@BeanpublicDirectExchangetwoExchange(@Qualifier("twoRabbitMqAdmin")RabbitAdmin oneRabbitMqAdmin){DirectExchange exchange =newDirectExchange("two_auto_test");
        exchange.setAdminsThatShouldDeclare(oneRabbitMqAdmin);return exchange;}@BeanpublicBindingtwoBinding(Queue twoQueue,DirectExchange twoExchange,@Qualifier("twoRabbitMqAdmin")RabbitAdmin oneRabbitMqAdmin){Binding binding =BindingBuilder.bind(twoQueue).to(twoExchange).with("two_auto_test");
        binding.setAdminsThatShouldDeclare(oneRabbitMqAdmin);return binding;}}

通过运行项目,发现队列,交换机,绑定关系创建有问题

2023-03-2117:54:06.860 TRACE 16056--- \[           main]o.s.b.f.s.DefaultListableBeanFactory:Ignoring match tocurrently created bean 'twoExchange':Error creating bean withname 'twoExchange':Requested bean is currently in creation:Is there an unresolvable circular reference?

这个问题非常头疼,日志级别是TRACE 才能看到(Spring 5.0.6 日志级别为DEBUG), 而且在我们配置代码中的断点都可以进入,但是就是无法成功创建。下面我根据自己排查问题的步骤做一次分享记录。

首先当队列、交换机、绑定关系无法创建时,首先我怀疑是关于rabbitmq 的配置出现问题,导致无法创建。而配置类中@Bean 声明后按常理会创建队列,那么肯定在rabbitmq 中有一个步骤就是获取IOC容器中的 Queue,Exchange,Binding 来进行相关处理。

第一个关键角色登场:RabbitAdmin

// 在RabbitAdmin 实现了 InitializingBean  ,afterPropertiesSet 会在 属性填充后执行@OverridepublicvoidafterPropertiesSet(){synchronized(this.lifecycleMonitor){if(this.running ||!this.autoStartup){return;}if(this.connectionFactory instanceofCachingConnectionFactory&&((CachingConnectionFactory)this.connectionFactory).getCacheMode()==CacheMode.CONNECTION){this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");return;}// Prevent stack overflow...finalAtomicBoolean initializing =newAtomicBoolean(false);this.connectionFactory.addConnectionListener(connection ->{if(!initializing.compareAndSet(false,true)){// If we are already initializing, we don't need to do it again...return;}try{/*
           * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
           * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
           * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
           * declared for every connection. If anyone has a problem with it: use auto-startup="false".
           */initialize();}finally{
          initializing.compareAndSet(true,false);}});this.running =true;}}publicvoidinitialize(){if(this.applicationContext ==null){this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");return;}this.logger.debug("Initializing declarations");Collection<Exchange> contextExchanges =newLinkedList<Exchange>(this.applicationContext.getBeansOfType(Exchange.class).values());Collection<Queue> contextQueues =newLinkedList<Queue>(this.applicationContext.getBeansOfType(Queue.class).values());Collection<Binding> contextBindings =newLinkedList<Binding>(this.applicationContext.getBeansOfType(Binding.class).values());@SuppressWarnings("rawtypes")Collection<Collection> collections =this.declareCollections
      ?this.applicationContext.getBeansOfType(Collection.class,false,false).values():Collections.emptyList();for(Collection<?> collection : collections){if(collection.size()>0&& collection.iterator().next()instanceofDeclarable){for(Object declarable : collection){if(declarable instanceofExchange){
            contextExchanges.add((Exchange) declarable);}elseif(declarable instanceofQueue){
            contextQueues.add((Queue) declarable);}elseif(declarable instanceofBinding){
            contextBindings.add((Binding) declarable);}}}}finalCollection<Exchange> exchanges =filterDeclarables(contextExchanges);finalCollection<Queue> queues =filterDeclarables(contextQueues);finalCollection<Binding> bindings =filterDeclarables(contextBindings);for(Exchange exchange : exchanges){if((!exchange.isDurable()|| exchange.isAutoDelete())&&this.logger.isInfoEnabled()){this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("+ exchange.getName()+") durable:"+ exchange.isDurable()+", auto-delete:"+ exchange.isAutoDelete()+". "+"It will be deleted by the broker if it shuts down, and can be redeclared by closing and "+"reopening the connection.");}}for(Queue queue : queues){if((!queue.isDurable()|| queue.isAutoDelete()|| queue.isExclusive())&&this.logger.isInfoEnabled()){this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("+ queue.getName()+") durable:"+ queue.isDurable()+", auto-delete:"+ queue.isAutoDelete()+", exclusive:"+ queue.isExclusive()+". "+"It will be redeclared if the broker stops and is restarted while the connection factory is "+"alive, but all messages will be lost.");}}if(exchanges.size()==0&& queues.size()==0&& bindings.size()==0){this.logger.debug("Nothing to declare");return;}this.rabbitTemplate.execute(channel ->{declareExchanges(channel, exchanges.toArray(newExchange[exchanges.size()]));declareQueues(channel, queues.toArray(newQueue[queues.size()]));declareBindings(channel, bindings.toArray(newBinding[bindings.size()]));returnnull;});this.logger.debug("Declarations finished");}

在RabbitAdmin 中可以看到暴露队列、交换机、绑定关系的相关处理逻辑。此处打断点后,可以发现无法全量获取到的交换机、绑定关系、队列。

在这里插入图片描述

那为什么没有全量获取呢?下面我们对getBeansOfType 方法进行剖析。重点关注下面代码:

在这里插入图片描述

此处是根据类型在IOC 容器中获取相关类型的Bean。

@Override@SuppressWarnings("unchecked")public<T>Map<String,T>getBeansOfType(@NullableClass<T> type,boolean includeNonSingletons,boolean allowEagerInit)throwsBeansException{String[] beanNames =getBeanNamesForType(type, includeNonSingletons, allowEagerInit);Map<String,T> result =CollectionUtils.newLinkedHashMap(beanNames.length);for(String beanName : beanNames){try{Object beanInstance =getBean(beanName);if(!(beanInstance instanceofNullBean)){
          result.put(beanName,(T) beanInstance);}}catch(BeanCreationException ex){Throwable rootCause = ex.getMostSpecificCause();if(rootCause instanceofBeanCurrentlyInCreationException){BeanCreationException bce =(BeanCreationException) rootCause;String exBeanName = bce.getBeanName();if(exBeanName !=null&&isCurrentlyInCreation(exBeanName)){if(logger.isTraceEnabled()){
              logger.trace("Ignoring match to currently created bean '"+ exBeanName +"': "+
                  ex.getMessage());}onSuppressedException(ex);// Ignore: indicates a circular reference when autowiring constructors.// We want to find matches other than the currently created bean itself.continue;}}throw ex;}}return result;}

getBeansOfType 是先根据类型获取BeanName ,在通过BeanName去实例化,初始化Bean, 但是在这一步会出现错误。

2023-03-2117:54:06.860 TRACE 16056---[           main]o.s.b.f.s.DefaultListableBeanFactory:Ignoring match tocurrently created bean 'twoExchange':Error creating bean withname 'twoExchange':Requested bean is currently in creation:Is there an unresolvable circular reference?

通过debug 不难发现上述错误是在 DefaultSingletonBeanRegistry.beforeSingletonCreation 方法中抛出的异常。

protectedvoidbeforeSingletonCreation(String beanName){if(!this.inCreationCheckExclusions.contains(beanName)&&!this.singletonsCurrentlyInCreation.add(beanName)){thrownewBeanCurrentlyInCreationException(beanName);}}

beforeSingletonCreation 就是在校验是否在重复创建bean 。表明现在有bean 正在重复创建。

此处大概已经明确了是bean的创建顺序导致有部分bean在重复创建,导致rabbitadmin 无法拿到全部的队列、交换机、绑定关系从而无法自动创建队列等。

接着,我们从Spring开始创建bean 的代码入手,

DefaultListableBeanFactory.preInstantiateSingletons

publicvoidpreInstantiateSingletons()throwsBeansException{if(logger.isTraceEnabled()){
      logger.trace("Pre-instantiating singletons in "+this);}// Iterate over a copy to allow for init methods which in turn register new bean definitions.// While this may not be part of the regular factory bootstrap, it does otherwise work fine.List<String> beanNames =newArrayList<>(this.beanDefinitionNames);// Trigger initialization of all non-lazy singleton beans...for(String beanName : beanNames){RootBeanDefinition bd =getMergedLocalBeanDefinition(beanName);if(!bd.isAbstract()&& bd.isSingleton()&&!bd.isLazyInit()){if(isFactoryBean(beanName)){Object bean =getBean(FACTORY_BEAN_PREFIX + beanName);if(bean instanceofFactoryBean){FactoryBean<?> factory =(FactoryBean<?>) bean;boolean isEagerInit;if(System.getSecurityManager()!=null&& factory instanceofSmartFactoryBean){
              isEagerInit =AccessController.doPrivileged((PrivilegedAction<Boolean>)((SmartFactoryBean<?>) factory)::isEagerInit,getAccessControlContext());}else{
              isEagerInit =(factory instanceofSmartFactoryBean&&((SmartFactoryBean<?>) factory).isEagerInit());}if(isEagerInit){getBean(beanName);}}}else{getBean(beanName);}}}// Trigger post-initialization callback for all applicable beans...for(String beanName : beanNames){Object singletonInstance =getSingleton(beanName);if(singletonInstance instanceofSmartInitializingSingleton){StartupStep smartInitialize =this.getApplicationStartup().start("spring.beans.smart-initialize").tag("beanName", beanName);SmartInitializingSingleton smartSingleton =(SmartInitializingSingleton) singletonInstance;if(System.getSecurityManager()!=null){AccessController.doPrivileged((PrivilegedAction<Object>)()->{
            smartSingleton.afterSingletonsInstantiated();returnnull;},getAccessControlContext());}else{
          smartSingleton.afterSingletonsInstantiated();}
        smartInitialize.end();}}}

可以看到Spring 是获取到所有的beanNames 然后循环创建bean

在这里插入图片描述

哦嚯,原来Queue, Exchange,Binding的顺序在RabbitAdmin之前,而创建队列的方法是需要依赖RabbitAdmin ,这就导致,在创建Queue时,会触发RabbitAdmin 的创建,而RabbitAdmin 初始化过程中,又会去扫描所有的Queue, Exchange,Binding,这时部分bean在创建中,导致RabbitAdmin 无法拿到bean, 也就无法进行自动创建。

改进:将RabbitAdmin 改变为类属性注入,这样就避免了重复创建问题。但是并不是所有的项目都会出现此问题,此问题依赖于项目中bean的加载顺序。


本文转载自: https://blog.csdn.net/qq_27061805/article/details/130006697
版权归原作者 峰回路转心不死 所有, 如有侵权,请联系我们删除。

“Spring初始化顺序- RabbitMq 无法自动创建队列”的评论:

还没有评论