0


敲详细的springboot中使用RabbitMQ的源码解析

这里介绍的源码主要是涉及springboot框架下的rabbitmq客户端代码(具体在springframework.amqp.rabbit包下,区分一下不由springboot直接接管的spring-rabbit的内容),springboot基于RabbitMQ的Java客户端建立了简便易用的框架。

springboot的框架下相对更多地使用消费者Consumer和监听器Listener的概念,这两个概念不注意区分容易混淆。默认情况下,springboot中消费者为单线程串行消费的模型,体现了队列的特性。


在springboot的框架下使用rabbitmq的一般步骤

  1. 启动rabbitmq服务器,springboot项目引入依赖
  2. 配置信息,有两种方式 1. 配置文件配置2. 配置类配置SimpleMessageListenerContainer
  3. 实现消息处理类ChannelAwareMessageListener处理业务逻辑,或用@RabbitListener注解

这两种方式其实异曲同工,@RabbitListener的方式在实际使用时创建MessagingMessageListenerAdapter,这个对象是ChannelAwareMessageListener接口的实现类,实现了onMessage()方法,这个方法利用了适配器模式,能够调用注解标注的方法,而实现ChannelAwareMessageListener的方式比较直白就是实现onMessage()方法


源码解析

关于SimpleMessageListenerContainer

SimpleMessageListenerContainer是在spring项目中使用RabbitMQ关键的类,用来接收并处理消息的。阅读源码可以从这个类入手。

  1. 首先关注构造器,需要传入ConnectionFactory用于获取连接,这跟原生rabbitmq是一致的,都从Connection连接开始。
  2. 关键属性concurrentConsumers:指定要创建的并发消费者的数量。默认值为1。建议增加并发使用者的数量,以便扩展从队列传入的消息的消耗。但是,请注意,一旦注册了多个消费者,将无法保证顺序。一般来说,对于低容量队列,坚持使用1个消费者。同时不能超过maxConcurrentConsumers(如果设置了)。maxConcurrentConsumers:设置消费者数量的上限。默认为concurrentConsumers。消费者可以根据需求增加,但不会小于concurrentConsumers。acknowledgeMode:消息确认模式// 自动确认消息container.setAcknowledgeMode(AcknowledgeMode.NONE);// 根据情况确认消息container.setAcknowledgeMode(AcknowledgeMode.AUTO);// 手动确认消息container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  3. 绑定组件:
  4. 设置消费者的Consumer_tag和Arguments:container.setConsumerTagStrategy可以设置消费者的 Consumer_tag, container.setConsumerArguments可以设置消费者的 Argumentscontainer.setConsumerTagStrategy(queue ->"order_queue_"+(++count));//设置消费者的ArgumentsMap<String,Object> args =newHashMap<>();args.put("module","订单模块");args.put("fun","发送消息");container.setConsumerArguments(args);在这里插入图片描述

spring的亮点在于用注解简化了很多代码操作,其中最常用的当属@RabbitListener

@RabbitListener(queues ={BiMqConstant.BI_QUEUE_NAME}, ackMode ="MANUAL")publicvoidreceiveMessage(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag){}

从@RabbitListener入手

1、从spring开启RabbitMQ的注解模式,@EnableRabbit导入RabbitBootstrapConfiguration配置类。

2、这个配置类定义了RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry两个bean。前者用来扫描加了@RabbitListener 的类,通过反射找到带注解的类,再找到对应的方法,存为handlerMethods。后者在注册终端后用于构建ListenerContainer(继承了RabbitListener注解内的信息,包括监听的队列和注解所在的类和方法)。

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(RabbitBootstrapConfiguration.class)public@interfaceEnableRabbit{}

3、RabbitListenerEndpointRegistry通过创建MethodRabbitListenerEndpoint对象和SimpleRabbitListenerContainerFactory工厂bean,生成SimpleMessageListenerContainer对象。

(RabbitListenerAnnotationBeanPostProcessor中拥有注解信息,如队列名,以及被标注注解的方法,所以endpoint的注册还是在processor类中)

(processor中有注册员成员变量registrar的registerEndpoint()注册endpoint,registrar有注册处registry成员变量注册利用registerListenerContainer()的createListenerContainer()注册container)

publicclassRabbitListenerEndpointRegistryimplementsSmartLifecycle{privatefinalMap<String,MessageListenerContainer> listenerContainers =newConcurrentHashMap<String,MessageListenerContainer>();//注册终端publicvoidregisterListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory,boolean startImmediately){String id = endpoint.getId();synchronized(this.listenerContainers){//创建 listenerContainerMessageListenerContainer container =createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);
            ……
 
            if(startImmediately){startIfNecessary(container);}}}protectedMessageListenerContainercreateListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory){//调用RabbitListener容器工厂的createListenerContainer方法获取RabbitListener容器MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);return listenerContainer;}

4、SimpleMessageListenerContainer对象保存了要监听的队列名(可以是configuration时set的也可以是@RabbitListener中标注的),创建了用于处理消息的MessagingMessageListenerAdapter实例(实际上是一个listener)

publicclassMethodRabbitListenerEndpointextendsAbstractRabbitListenerEndpoint{......@OverrideprotectedMessagingMessageListenerAdaptercreateMessageListener(MessageListenerContainer container){Assert.state(this.messageHandlerMethodFactory !=null,"Could not create message listener - MessageHandlerMethodFactory not set");MessagingMessageListenerAdapter messageListener =createMessageListenerInstance();
        messageListener.setHandlerMethod(configureListenerAdapter(messageListener));String replyToAddress =getDefaultReplyToAddress();if(replyToAddress !=null){
            messageListener.setResponseAddress(replyToAddress);}MessageConverter messageConverter = container.getMessageConverter();if(messageConverter !=null){
            messageListener.setMessageConverter(messageConverter);}if(getBeanResolver()!=null){
            messageListener.setBeanResolver(getBeanResolver());}return messageListener;}protectedMessagingMessageListenerAdaptercreateMessageListenerInstance(){returnnewMessagingMessageListenerAdapter(this.bean,this.method);}......}

5、SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer(区分,该类封装了BlockingQueueConsumer,由于该类实现了Runnable接口,可以视为一个线程任务放入线程池中执行)有一个run()方法,调用了receiveAndExecute(),这个方法会获取BlockingQueueConsumer,阻塞读取其消息(一次获取多条),完成消息读取。

6、接着调用listener进行消息处理,这里设置了代理,最终会执行actualInvokeListener所谓实际被执行的listener,溯源最终调用了listener.onMessage(message, channelToUse)。

SimpleMessageListenerContainer{//接受并执行privatebooleanreceiveAndExecute(finalBlockingQueueConsumer consumer)throwsThrowable{//do接受并执行returndoReceiveAndExecute(consumer);}//do接受并执行privatebooleandoReceiveAndExecute(BlockingQueueConsumer consumer)throwsThrowable{Channel channel = consumer.getChannel();for(int i =0; i <this.txSize; i++){//txSize为一次事务接受的消息个数//读取消息,这里阻塞的,但是有一个超时时间。Message message = consumer.nextMessage(this.receiveTimeout);if(message ==null){//阻塞超时break;}try{executeListener(channel, message);//消息接收已完成,现在开始处理消息。}catch(Exception e){}}return consumer.commitIfNecessary(isChannelLocallyTransacted());}//处理消息开始。该方法在其父类中protectedvoidexecuteListener(Channel channel,Message messageIn)throwsException{try{Message message = messageIn;if(……){//批处理信息,这个不研究}else{invokeListener(channel, message);}}catch(Exception ex){}}//在其父类中protectedvoidinvokeListener(Channel channel,Message message)throwsException{//这里this.proxy.invokeListener最终会调用actualInvokeListener方法。this.proxy.invokeListener(channel, message);}//在其父类中protectedvoidactualInvokeListener(Channel channel,Message message)throwsException{Object listener =getMessageListener();if(listener instanceofChannelAwareMessageListener){doInvokeListener((ChannelAwareMessageListener) listener, channel, message);}elseif(listener instanceofMessageListener){//……doInvokeListener((MessageListener) listener, message)}else{//……}}protectedvoiddoInvokeListener(ChannelAwareMessageListener listener,Channel channel,Message message)throwsException{Channel channelToUse = channel;try{
                listener.onMessage(message, channelToUse);}catch(Exception e){throwwrapToListenerExecutionFailedExceptionIfNeeded(e, message);}}}

7、关于第6点,根据这个listener实例的不同,有两种处理方式:

如果是前面所说的实现ChannelAwareMessageListener,就直接调用实现类的onMessage()。

如果是@RabbitListener注解,不同在于MessagingMessageListenerAdapter(ChannelAwareMessageListener的实现类,也是listen),基于适配器模式持有@RabbitListener注解的对象和方法(adapter实例中有HandlerMethod属性加入到adapter类中,HandlerMethod调用invoke()就能执行注解标注的方法)。

publicclassHandlerAdapter{privatefinalInvocableHandlerMethod invokerHandlerMethod;privatefinalDelegatingInvocableHandler delegatingHandler;publicObjectinvoke(Message<?> message,Object... providedArgs)throwsException{if(this.invokerHandlerMethod !=null){//InvocableHandlerMethod不为null,就调用invokerHandlerMethod.invoke方法。returnthis.invokerHandlerMethod.invoke(message, providedArgs);}elseif(this.delegatingHandler.hasDefaultHandler()){//……}else{//……}}}publicclassMessagingMessageListenerAdapterextendsAbstractAdaptableMessageListener{privateHandlerAdapter handlerMethod;}

现在就能把整个过程串起来了


关于关于endpoint和register

Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。这里也用了终端的概念,被@RabbitListener注解修饰方法也有终端的特点可以接受外部信息并响应,即接到消息就执行对应方法。

registry姑且成为注册处用Map保存endpoint的id和对应的listenerContainer,注册处registerListenerContainer()利用endpoint和factory实例创建container,实际上是用了containerfactory的createListenerContainer(RabbitListenerEndpoint endpoint)方法

publicclassRabbitListenerEndpointRegistryimplementsDisposableBean,SmartLifecycle,ApplicationContextAware,ApplicationListener<ContextRefreshedEvent>{// 检查是否被注册过,注册过就不能注册第二次// 调用createListenerContainer创建消息监听// 关于分组消费的,我们不关心// 是否立即启动,是的话,同步调用startIfNecessary方法publicvoidregisterListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory,boolean startImmediately){Assert.notNull(endpoint,"Endpoint must not be null");Assert.notNull(factory,"Factory must not be null");String id = endpoint.getId();Assert.hasText(id,"Endpoint id must not be empty");synchronized(this.listenerContainers){Assert.state(!this.listenerContainers.containsKey(id),"Another endpoint is already registered with id '"+ id +"'");MessageListenerContainer container =createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if(StringUtils.hasText(endpoint.getGroup())&&this.applicationContext !=null){List<MessageListenerContainer> containerGroup;if(this.applicationContext.containsBean(endpoint.getGroup())){
                    containerGroup =this.applicationContext.getBean(endpoint.getGroup(),List.class);}else{
                    containerGroup =newArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}
                containerGroup.add(container);}if(startImmediately){startIfNecessary(container);}}// 其实就是调用了RabbitListenerContainerFactory的createListenerContainer生成了一个MessageListenerContainer对象protectedMessageListenerContainercreateListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory){MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);if(listenerContainer instanceofInitializingBean){try{((InitializingBean) listenerContainer).afterPropertiesSet();}catch(Exception ex){thrownewBeanInitializationException("Failed to initialize message listener container", ex);}}int containerPhase = listenerContainer.getPhase();if(containerPhase <Integer.MAX_VALUE){// a custom phase valueif(this.phase <Integer.MAX_VALUE&&this.phase != containerPhase){thrownewIllegalStateException("Encountered phase mismatch between container factory definitions: "+this.phase +" vs "+ containerPhase);}this.phase = listenerContainer.getPhase();}return listenerContainer;}}

把endpoint内的信息全部注入到container里。

@OverridepublicCcreateListenerContainer(RabbitListenerEndpoint endpoint){C instance =createContainerInstance();if(this.connectionFactory !=null){
        instance.setConnectionFactory(this.connectionFactory);}if(this.errorHandler !=null){
        instance.setErrorHandler(this.errorHandler);}if(this.messageConverter !=null){
        instance.setMessageConverter(this.messageConverter);}if(this.acknowledgeMode !=null){
        instance.setAcknowledgeMode(this.acknowledgeMode);}if(this.channelTransacted !=null){
        instance.setChannelTransacted(this.channelTransacted);}if(this.autoStartup !=null){
        instance.setAutoStartup(this.autoStartup);}if(this.phase !=null){
        instance.setPhase(this.phase);}
    instance.setListenerId(endpoint.getId());// 最重要的一行!!!
    endpoint.setupListenerContainer(instance);initializeContainer(instance);return instance;}

关于container和containerFactory

containerFactory也能配置并发消费者等参数。

@Configuration@EnableAsyncpublicclassThreadPoolConfig{@Bean("customContainerFactory")publicSimpleRabbitListenerContainerFactorycontainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
        factory.setConcurrentConsumers(10);//设置线程数
        factory.setMaxConcurrentConsumers(10);//最大线程数
        configurer.configure(factory, connectionFactory);return factory;}}

配置containerFactory能够创建container,但一般不在配置类中手动创建。一般是在注解中标记,然后让spring来生产container。

@RabbitListener(queues="demo.queue",containerFactory ="customContainerFactory")

直接配置container效果是相同的,同样可以设置队列,并发消费者等。


细说上面第5步container内的操作。

  1. container的启动入口是star()方法,然后进入doStart(),在该方法中会初始化consumer(BlockingQueueConsumer),每一个并发需要对应一个consumer,consumer的数量是根据前面所说的concurrentConsumers确定consumer =newBlockingQueueConsumer(getConnectionFactory(),getMessagePropertiesConverter(),this.cancellationLock,getAcknowledgeMode(),isChannelTransacted(), actualPrefetchCount,isDefaultRequeueRejected(),getConsumerArguments(),isNoLocal(),isExclusive(), queues);// 带有连接信息,数据转换器,确认模式,预取值,consumerArgs,监听的队列(可多个)等信息传入

区分一下consumer和listener,consumer是接收消息的消费者,listener是实际处理业务的执行者,consumer接收的每个消息都需要调用listener内的onMessage()方法来处理实际业务。

int newConsumers =initializeConsumers();
  1. 然后将consumer封装成AsyncMessageProcessingConsumer线程任务类型,然后就可以放入线程池中执行。
AsyncMessageProcessingConsumer processor =newAsyncMessageProcessingConsumer(consumer);
processors.add(processor);getTaskExecutor().execute(processor);
  1. 这里的线程池是SimpleAsyncTaskExecutor(也可以自定义传入),默认是不限制并发量的。每个container都有一个线程池,线程不足以支持consumer并发时就会超时报错。privateExecutor taskExecutor =newSimpleAsyncTaskExecutor();
  2. 进入AsyncMessageProcessingConsumer这个Runnable类的run()方法,如果consumer有监听的队列,就初始化initialize并开启mainloop()if(this.consumer.getQueueCount()<1){...}try{initialize();while(isActive(this.consumer)||this.consumer.hasDelivery()||!this.consumer.cancelled()){mainLoop();}}
  3. initialize()会创建exchangequeuebindings等实例,设置Qos,实现consumer与broker之间的对接,完成消息的订阅,并且会根据tag不同在每个BlockingQueueConsumer中再划分出internalConsumer,再放入BlockingQueueConsumer的queue中逐一处理。说明Qos流控指令包括prefetch-sizeprefetch-count参数。//该参数是设置在channel上的int prefetchCount =1;channel.basicQos(prefetchCount);broker的delivery指令在客户端会先打包成一个Envelope,所以consumertag是对应consumer一个,而deliveryTag是对应broker中的一条消息一个。Envelope envelope =newEnvelope(m.getDeliveryTag(), m.getRedelivered(), m.getExchange(), m.getRoutingKey());当然在broker执行delivery指令将消息推送到客户端Consumer之前还有channel,一个BlockingQueueConsumer对应一个channel,对应一个线程的调用。内部的consumer共用channel,channel会根据tag在dispatcher将消息推送至对应的consumer。一个channel对应了多个consumer在这里插入图片描述多个AsyncMessageProcessingConsumer对应不同的线程来处理在这里插入图片描述一个container可能监听多个队列。在这里插入图片描述
  4. mainLoop()相较于如何利用consumer接收消息,更侧重于最终的listener来进行业务处理。前面已经知道客户端会将消息存到Consume的queue中,简单来说,mainloop就是只要客户端正常启动就会无限循环来处理业务的,它主要就是完成从queue中提取消息数据然后经过一系列操作最终传递给业务逻辑处理MessageListener中。mainLoop()方法中就会从queue中提取消息,根据**batchSize确定每次提取消息数量,最后回调MessageListener,实现将消息传递到业务逻辑进行处理;多个AsyncMessageProcessingConsumer对应一个listener(一个container对应一个listener即是一套处理业务,共用一个线程池**,因为它们只是对应不同的并发, 处理的业务逻辑应是相同的。在这里插入图片描述

增加RabbitMQ并发的方法

  1. 增加并发消费者数量。并保障能提供充足的线程资源,虽然默认的线程池不设线程并发上线。示例:Redis与RabbitMQ配合使用多线程(多消费者)处理消息_多线程 处理 rabbitmq消息-CSDN博客
  2. 在listener方法上加上@Async(),这样会在异步的子线程下执行,如果提供线程池,就能实现并发。示例:线程池解决RabbitMQ消息堆积_rabbitmq线程池-CSDN博客
  3. 增大prefetchCount,prefetchCount是BlockingQueueConsumer内部维护的一个阻塞队列LinkedBlockingQueue的大小,其作用就是如果某个消费者队列阻塞,就无法接收新的消息
  4. 配置container的自定义线程池,但这个方法不推荐,示例:【RabbitMQ-9】自定义配置线程池(线程池资源不足-MQ初始化队列&&MQ动态扩容影响) - 简书 (jianshu.com)
  5. 当并发量确实无法短时间内提高时,也应尽可能提高消息队列的容量,并开启持久化。如设置惰性队列。RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成消息堆积时,惰性队列就很有必要了。正常的队列会尽可能存储在内存中。
标签: java rabbitmq 中间件

本文转载自: https://blog.csdn.net/m0_72397750/article/details/140558744
版权归原作者 加糖苏打水 所有, 如有侵权,请联系我们删除。

“敲详细的springboot中使用RabbitMQ的源码解析”的评论:

还没有评论