0


rabbitMq 针对于当前监听的队列,来控制消费者并发数量,不影响其他队列,代码示例

@Configuration@ConditionalOnClass(SimpleRabbitListenerContainerFactory.class)publicclassConsumerConfig{@Value("${rabbit.batch.num:100}")privateint batchNum;@Bean("batchQueueRabbitListenerContainerFactory")publicSimpleRabbitListenerContainerFactorybatchQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setBatchListener(true);
        factory.setConsumerBatchEnabled(true);
        factory.setBatchSize(batchNum);
        factory.setConcurrentConsumers(5);// 设置并发消费者数量为 5
        factory.setMaxConcurrentConsumers(10);// 设置最大并发消费者数量为 10return factory;}}

concurrentConsumers 和 maxConcurrentConsumers 属性的具体含义如下:
concurrentConsumers:指定同时运行的消费者数量,默认为1。
maxConcurrentConsumers:指定允许的最大并发消费者数量,默认为1。
因此,在上述示例中,设置了 concurrentConsumers 为 5,maxConcurrentConsumers 为 10,意味着 RabbitMQ 容器将维持一个初始的消费者池大小为 5,并在需要时最多扩展到 10 个并发消费者。

通过以上修改,你就可以在 batchQueueRabbitListenerContainerFactory 中控制消费者的并发数量了。根据你的实际需求,可以调整并发消费者的数量以满足系统性能和资源的要求。
需要注意的是,这种设置会影响到特定队列的消费者并发数量,而不会影响其他队列的消费者。因为你是针对特定的batchQueueRabbitListenerContainerFactory进行配置,所以只会影响使用该工厂的队列。
如果你想配置多个工厂,可以继续添加其他的@Bean方法。

例如,你可以添加另一个SimpleRabbitListenerContainerFactory bean,命名为anotherQueueRabbitListenerContainerFactory,并配置相应的属性:

@Bean("anotherQueueRabbitListenerContainerFactory")publicSimpleRabbitListenerContainerFactoryanotherQueueRabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);// 配置其他属性return factory;}

通过这种方式,你可以定义多个SimpleRabbitListenerContainerFactory bean,并分别配置每个工厂需要的属性。然后在需要使用特定工厂的@RabbitListener注解中,通过containerFactory属性指定使用哪个工厂。

举个例子,如果你希望某个队列使用anotherQueueRabbitListenerContainerFactory工厂进行监听,可以这样设置:

@RabbitListener(queues ="another.queue", containerFactory ="anotherQueueRabbitListenerContainerFactory")publicvoidonAnotherMessage(Message message){// 处理消息}

通过这种方式,你可以根据需要定义多个工厂,并将它们分配给不同的队列进行监听。
以下是rabbitMq监听消息代码:

@RabbitListener(queues ="test.queue", containerFactory ="batchQueueRabbitListenerContainerFactory")@RabbitHandlerpublicvoidonReportMessage(List<Message> messages){List<Map<String,Object>> list = messages.stream().map(message ->(Map<String,Object>) message.getPayload()).collect(Collectors.toList());
    log.info("report收到数据:{}",JSON.toJSONString(list));
    service.handler(list);}

本文转载自: https://blog.csdn.net/weixin_44060488/article/details/134446877
版权归原作者 爱吃土豆的马铃薯ㅤㅤㅤㅤㅤㅤㅤㅤㅤ 所有, 如有侵权,请联系我们删除。

“rabbitMq 针对于当前监听的队列,来控制消费者并发数量,不影响其他队列,代码示例”的评论:

还没有评论