0


(17)不重启服务动态调整RabbitMQ消费者数量

    我们使用springboot集成rabbitmq时会配置消费者数量,然而我们想调整这个数量时却每次都要重启,这样就很麻烦。如果能在不重启服务的情况下,可以动态调整消费者数量的话就会是分方便了。

    先看下springboot中关于rabbitmq的自动配置类,RabbitAutoConfiguration,
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    。。。。。。
}
    @Import导入了RabbitAnnotationDrivenConfiguration
@Configuration
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
    ......
}
    RabbitAnnotationDrivenConfiguration上面有个EnableRabbit,打开看一下EnableRabbit是一个注解,里面又导入了RabbitBootstrapConfiguration
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
    RabbitBootstrapConfiguration类内容如下:
@Configuration
public class RabbitBootstrapConfiguration {

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
        return new RabbitListenerAnnotationBeanPostProcessor();
    }

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

}
    RabbitListenerAnnotationBeanPostProcessor用于处理@RabbitListener注解修饰的方法

    RabbitListenerEndpointRegistry用于创建和管理消息监听容器MessageListenerContainer,重点看这里。。。。。

    RabbitListenerEndpointRegistry类中有个registerListenerContainer注册消息监听容器的方法,该方法被RabbitListenerEndpointRegistrar的registerAllEndpoints调用,endpointDescriptors是前面的RabbitListenerAnnotationBeanPostProcessor获取的@RabbitListener注解修饰的消息消费处理的方法集合。
@Override
public void afterPropertiesSet() {
    registerAllEndpoints();
}

protected void registerAllEndpoints() {
    Assert.state(this.endpointRegistry != null, "No registry available");
    synchronized (this.endpointDescriptors) {
        for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
            this.endpointRegistry.registerListenerContainer(// NOSONAR never null
                    descriptor.endpoint, resolveContainerFactory(descriptor));
        }
        this.startImmediately = true;  // trigger immediate startup
    }
}
    这里不再深入探究具体的源码了,感兴趣的话可以自己翻看一下。大致调用顺序为:
  1. RabbitListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(获取被@RabbitListener注解修饰的方法)————>RabbitListenerEndpointRegistrar.registerEndpoint(添加到endpointDescriptors list集合)

  2. RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated()————>RabbitListenerEndpointRegistrar.afterPropertiesSet————>RabbitListenerEndpointRegistrar.registerAllEndpoints————>RabbitListenerEndpointRegistry.registerListenerContainer

      RabbitListenerEndpointRegistry.registerListenerContainer方法如下,将所有创建的消息监听容器MessageListenerContainer都放到了listenerContainers这个map中。
    
private final Map<String, MessageListenerContainer> listenerContainers =
            new ConcurrentHashMap<String, MessageListenerContainer>();
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
                boolean startImmediately) {
    Assert.notNull(endpoint, "Endpoint must not be null");
    Assert.notNull(factory, "Factory must not be null");

    String id = endpoint.getId();
    Assert.hasText(id, "Endpoint id must not be empty");
    synchronized (this.listenerContainers) {
        Assert.state(!this.listenerContainers.containsKey(id),
                "Another endpoint is already registered with id '" + id + "'");
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
        if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
            List<MessageListenerContainer> containerGroup;
            if (this.applicationContext.containsBean(endpoint.getGroup())) {
                containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
            }
            else {
                containerGroup = new ArrayList<MessageListenerContainer>();
                this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
            }
            containerGroup.add(container);
        }
        if (startImmediately) {
            startIfNecessary(container);
        }
    }
}
    我们遍历listenerContainers就能拿到对应的消息监听容器MessageListenerContainer,然后调用MessageListenerContainer的setConcurrentConsumers、setMaxConcurrentConsumers方法就可以调整消费者数量了。

    RabbitListenerEndpointRegistry.getListenerContainers可以获取所有消费监听容器
public void setConcurrentConsumers(final int concurrentConsumers) {
    Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
    Assert.isTrue(!isExclusive() || concurrentConsumers == 1,
            "When the consumer is exclusive, the concurrency must be 1");
    if (this.maxConcurrentConsumers != null) {
        Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
                "'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
    }
    synchronized (this.consumersMonitor) {
        if (logger.isDebugEnabled()) {
            logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
        }
        int delta = this.concurrentConsumers - concurrentConsumers;
        this.concurrentConsumers = concurrentConsumers;
        if (isActive()) {
            adjustConsumers(delta);
        }
    }
}

public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
    Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers,
            "'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
    Assert.isTrue(!isExclusive() || maxConcurrentConsumers == 1,
            "When the consumer is exclusive, the concurrency must be 1");
    Integer oldMax = this.maxConcurrentConsumers;
    this.maxConcurrentConsumers = maxConcurrentConsumers;
    if (oldMax != null && isActive()) {
        int delta = oldMax - maxConcurrentConsumers;
        if (delta > 0) { // only decrease, not increase
            adjustConsumers(delta);
        }
    }

}
    废话不多说,直接上代码示例:
@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RequestMapping(value = "/modifyMqConsumerNum")
@ApiOperation(value = "更新队列消费者数量接口")
public Response modifyMqConsumerNum(@RequestParam(value = "queueName", required = false) String queueName,
                                    @RequestParam(value = "concurrentConsumers") Integer concurrentConsumers,
                                    @RequestParam(value = "maxConcurrentConsumers") Integer maxConcurrentConsumers) {
    Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
    for (MessageListenerContainer container : listenerContainers) {
        SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
        //消息监听容器要消费的队列名称集合
        List<String> queueNamesList = Arrays.asList(con.getQueueNames());
        //判断容器中的队列名称是否包含需要调整的队列名参数
        if (queueNamesList.contains(queueName)) {
            //注意先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败
            con.setMaxConcurrentConsumers(maxConcurrentConsumers);
            con.setConcurrentConsumers(concurrentConsumers);
        }
    }
    return Response.success();
}
    调用RabbitListenerEndpointRegistry.getListenerContainers获取所有消费者监听容器,判断是否包含要调整的队列名称,如果包含则进行调整。

    注意:先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败。例如之前最小最大分别是2和4,如果先将最小改成5则会报参数异常,即最小数量超过了最大数量。

java.lang.IllegalArgumentException: 'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'
at org.springframework.util.Assert.isTrue(Assert.java:118)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.setConcurrentConsumers(SimpleMessageListenerContainer.java:161)

     另外,这里通过接口修改只能改服务的一个实例的消费者数量,生产上面一个服务都是集群部署的,可以结合配置中心(Nacos、Apollo等)进行处理。程序中监听配置中心的对应队列的消费者数量,如果数值发生了变化,则调用上面的方法进行变更就好了,这里就不再进行实现了。

本文转载自: https://blog.csdn.net/u012988901/article/details/126971334
版权归原作者 没头脑遇到不高兴 所有, 如有侵权,请联系我们删除。

“(17)不重启服务动态调整RabbitMQ消费者数量”的评论:

还没有评论