我们使用springboot集成rabbitmq时会配置消费者数量,然而我们想调整这个数量时却每次都要重启,这样就很麻烦。如果能在不重启服务的情况下,可以动态调整消费者数量的话就会是分方便了。
先看下springboot中关于rabbitmq的自动配置类,RabbitAutoConfiguration,
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
。。。。。。
}
@Import导入了RabbitAnnotationDrivenConfiguration
@Configuration
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
......
}
RabbitAnnotationDrivenConfiguration上面有个EnableRabbit,打开看一下EnableRabbit是一个注解,里面又导入了RabbitBootstrapConfiguration
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
RabbitBootstrapConfiguration类内容如下:
@Configuration
public class RabbitBootstrapConfiguration {
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
RabbitListenerAnnotationBeanPostProcessor用于处理@RabbitListener注解修饰的方法
RabbitListenerEndpointRegistry用于创建和管理消息监听容器MessageListenerContainer,重点看这里。。。。。
RabbitListenerEndpointRegistry类中有个registerListenerContainer注册消息监听容器的方法,该方法被RabbitListenerEndpointRegistrar的registerAllEndpoints调用,endpointDescriptors是前面的RabbitListenerAnnotationBeanPostProcessor获取的@RabbitListener注解修饰的消息消费处理的方法集合。
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
Assert.state(this.endpointRegistry != null, "No registry available");
synchronized (this.endpointDescriptors) {
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(// NOSONAR never null
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
这里不再深入探究具体的源码了,感兴趣的话可以自己翻看一下。大致调用顺序为:
RabbitListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(获取被@RabbitListener注解修饰的方法)————>RabbitListenerEndpointRegistrar.registerEndpoint(添加到endpointDescriptors list集合)
RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated()————>RabbitListenerEndpointRegistrar.afterPropertiesSet————>RabbitListenerEndpointRegistrar.registerAllEndpoints————>RabbitListenerEndpointRegistry.registerListenerContainer
RabbitListenerEndpointRegistry.registerListenerContainer方法如下,将所有创建的消息监听容器MessageListenerContainer都放到了listenerContainers这个map中。
private final Map<String, MessageListenerContainer> listenerContainers =
new ConcurrentHashMap<String, MessageListenerContainer>();
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
startIfNecessary(container);
}
}
}
我们遍历listenerContainers就能拿到对应的消息监听容器MessageListenerContainer,然后调用MessageListenerContainer的setConcurrentConsumers、setMaxConcurrentConsumers方法就可以调整消费者数量了。
RabbitListenerEndpointRegistry.getListenerContainers可以获取所有消费监听容器
public void setConcurrentConsumers(final int concurrentConsumers) {
Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
Assert.isTrue(!isExclusive() || concurrentConsumers == 1,
"When the consumer is exclusive, the concurrency must be 1");
if (this.maxConcurrentConsumers != null) {
Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
"'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
}
synchronized (this.consumersMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
}
int delta = this.concurrentConsumers - concurrentConsumers;
this.concurrentConsumers = concurrentConsumers;
if (isActive()) {
adjustConsumers(delta);
}
}
}
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
Assert.isTrue(maxConcurrentConsumers >= this.concurrentConsumers,
"'maxConcurrentConsumers' value must be at least 'concurrentConsumers'");
Assert.isTrue(!isExclusive() || maxConcurrentConsumers == 1,
"When the consumer is exclusive, the concurrency must be 1");
Integer oldMax = this.maxConcurrentConsumers;
this.maxConcurrentConsumers = maxConcurrentConsumers;
if (oldMax != null && isActive()) {
int delta = oldMax - maxConcurrentConsumers;
if (delta > 0) { // only decrease, not increase
adjustConsumers(delta);
}
}
}
废话不多说,直接上代码示例:
@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@RequestMapping(value = "/modifyMqConsumerNum")
@ApiOperation(value = "更新队列消费者数量接口")
public Response modifyMqConsumerNum(@RequestParam(value = "queueName", required = false) String queueName,
@RequestParam(value = "concurrentConsumers") Integer concurrentConsumers,
@RequestParam(value = "maxConcurrentConsumers") Integer maxConcurrentConsumers) {
Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
for (MessageListenerContainer container : listenerContainers) {
SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;
//消息监听容器要消费的队列名称集合
List<String> queueNamesList = Arrays.asList(con.getQueueNames());
//判断容器中的队列名称是否包含需要调整的队列名参数
if (queueNamesList.contains(queueName)) {
//注意先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败
con.setMaxConcurrentConsumers(maxConcurrentConsumers);
con.setConcurrentConsumers(concurrentConsumers);
}
}
return Response.success();
}
调用RabbitListenerEndpointRegistry.getListenerContainers获取所有消费者监听容器,判断是否包含要调整的队列名称,如果包含则进行调整。
注意:先设置最大的消费者数量,再设置最小的消费者数量,因为先修改最小数量超过修改前的最大数量时会报异常修改失败。例如之前最小最大分别是2和4,如果先将最小改成5则会报参数异常,即最小数量超过了最大数量。
java.lang.IllegalArgumentException: 'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'
at org.springframework.util.Assert.isTrue(Assert.java:118)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.setConcurrentConsumers(SimpleMessageListenerContainer.java:161)
另外,这里通过接口修改只能改服务的一个实例的消费者数量,生产上面一个服务都是集群部署的,可以结合配置中心(Nacos、Apollo等)进行处理。程序中监听配置中心的对应队列的消费者数量,如果数值发生了变化,则调用上面的方法进行变更就好了,这里就不再进行实现了。
版权归原作者 没头脑遇到不高兴 所有, 如有侵权,请联系我们删除。