0


SpringBoot如何与RabbitMQ建立连接

文章目录

一. 引言

本文主要描述SpringBoot如何与RabbitMQ建立连接,建立多少个连接,以及如何接收消息

二. 相关概念

  • ConnectionFactory接口,是客户端与RabbitMQ服务器的tcp socket连接工厂,负责根据服务器地址创建Connection
  • Connection是客户端与RabbitMQ服务器的socket连接,它封装了socket协议相关的逻辑(比如接收和发送消息)
  • Channel是RabbitMQ客户端与服务器交互的最重要的一个接口,大部分的业务操作是在Channel接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息、接收消息等。Channel之间是完全隔离的

Channel与Connection的区别?

  • 职责不同。Channel负责封装业务,Connection负责封装Socket连接
  • 多线程和性能因素。建立和销毁一个Connection的开销比较大,而Channel是一个建立在Connection内部的逻辑连接,建立和销毁的成本比较低。

注意:在Connection底层,当有多个Channel发送消息时,也是排队发送的。

三. 连接代码实现

  • https://blog.csdn.net/qq_43216019/article/details/128824328?spm=1001.2014.3001.5501中介绍了如何实现消息接收功能,其中RabbitListenerAnnotationBeanPostProcessor中类中,没扫描到一个消息接收方法,就定义为如下实例:```RabbitListenerEndpoint(接口)SimpleRabbitListenerEndpoint(实现:表示RabbitListenerConfigurer接口所实现类的endPointMethodRabbitListenerEndpoint(实现:表示@RabbitListener+@RabbitHandler注解方法对应的endpoint))```
  • 同时在每个RabbitListenerAnnotationBeanPostProcessor类中,对于每一个endpoint,最终都会回调RabbitListenerEndpointRegistery的doStart方法,伪代码:publicvoidregisterListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory,boolean startImmediately){synchronized(this.listenerContainers){//创建SimpleMessageListenerContainer实例MessageListenerContainer container =this.createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if(StringUtils.hasText(endpoint.getGroup())&&this.applicationContext !=null){Object containerGroup;if(this.applicationContext.containsBean(endpoint.getGroup())){ containerGroup =(List)this.applicationContext.getBean(endpoint.getGroup(),List.class);}else{ containerGroup =newArrayList();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}((List)containerGroup).add(container);}if(this.contextRefreshed){ container.lazyLoad();}//启动调用SimpleMessageListenerContainer的start方法if(startImmediately){this.startIfNecessary(container);}}}
  • 在SimpleMessagelistenerContainer类中,start方法最终会调用的dostart方法,伪代码:protected void doStart() { Assert.state(!this.consumerBatchEnabled || this.getMessageListener() instanceof BatchMessageListener || this.getMessageListener() instanceof ChannelAwareBatchMessageListener, "When setting 'consumerBatchEnabled' to true, the listener must support batching"); this.checkListenerContainerAware(); super.doStart(); synchronized(this.consumersMonitor) { if (this.consumers != null) { throw new IllegalStateException("A stopped container should not have consumers"); } else { //根据@RabbitListener注解中定义的concurrency数量初始化consumer的个数 对于每个consumer生成一个BlockingQueueConsumer实例 int newConsumers = this.initializeConsumers(); if (this.consumers == null) { this.logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)"); } else if (newConsumers <= 0) { if (this.logger.isInfoEnabled()) { this.logger.info("Consumers are already running"); } } else { Set<SimpleMessageListenerContainer.AsyncMessageProcessingConsumer> processors = new HashSet(); Iterator var4 = this.consumers.iterator(); while(var4.hasNext()) { //对于每个BlockingQueueConsumer,生成AsyncMessageProcessingConsumer,并启动一个线程启动这个consumer,实际上调用了BlockingQueueConsumer实例的start方法 BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next(); SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer); processors.add(processor); this.getTaskExecutor().execute(processor); if (this.getApplicationEventPublisher() != null) { this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } this.waitForConsumersToStart(processors); } } } }
  • BlockingQueueConsumer的start方法启动consumerpublicvoidstart()throwsAmqpException{try{this.resourceHolder =ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,this.transactional);this.channel =this.resourceHolder.getChannel();ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel);}catch(AmqpAuthenticationException var2){thrownewFatalListenerStartupException("Authentication failure", var2);}}

​ 注意:每个监听线程都会调用以上方法创建connection和channel,那么connection和channel的数量岂不是一致?

​ CachingConnectionFactory bean的createConnection负责创建connection:

public final Connection createConnection() throws AmqpException {
    ......
        Object var1 = this.connectionMonitor;
        synchronized(this.connectionMonitor) {
            if (this.cacheMode == CachingConnectionFactory.CacheMode.CHANNEL) {
                //当第一个线程创建connection时,this.connection.target为空,所以调用父类方法创建真实的connection。
                //当第二个以上的线程创建connection时,直接返回之前创建好的connection。
                if (this.connection.target == null) {
                    this.connection.target = super.createBareConnection();
                    ......
                }
                return this.connection;
            } 
            ......
        }
    }
}

所以系统即使有多个线程,也仅仅创建了一个connection,而每个线程创建了自己独立的channel。

后续继续学习记录AMQP协议相关知识


本文转载自: https://blog.csdn.net/qq_43216019/article/details/128842338
版权归原作者 pipape 所有, 如有侵权,请联系我们删除。

“SpringBoot如何与RabbitMQ建立连接”的评论:

还没有评论