监听器类型 simple,direct区别
1、消费者,channel,connection的关系
首先明确,这里的consumer不是一台消费者机器,而是rabbitMq的最小消费单位,一台机器可以开启多个消费者,一个消费者总是对应一个channel。
一个TCP 被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。
也就是rabbitMq采用一个TCP连接处理多个消费者的多线程请求,实际上就是多路复用。
2、线程模型
simple
simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。
direct
看的出来direct的线程模型更简单,也因此压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。
看起来direct更好一些,那么如何选择
如何选择容器
看官网的说法
选择容器
2.0版本引入了DirectMessageListenerContainer(DMLC)。此前,仅SimpleMessageListenerContainer(SMLC) 可用。SMLC 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。之所以需要这种架构,是因为在 RabbitMQ 客户端的早期版本中,多个并发传送是不可能的。较新版本的客户端具有修订的线程模型,现在可以支持并发。这允许引入 DMLC,现在可以直接在 RabbitMQ 客户端线程上调用侦听器。因此,它的架构实际上比 SMLC“更简单”。然而,这种方法存在一些局限性,并且 DMLC 不具备 SMLC 的某些功能。consumersPerQueue此外,并发性由(以及客户端库的线程池)控制。concurrentConsumers此容器不提供 和关联的属性。
以下功能适用于 SMLC,但不适用于 DMLC:
- batchSize:使用SMLC,你可以设置这个来控制一个事务中投递多少条消息或者减少ack的数量,但是可能会导致失败后重复投递的数量增加。(DMLC 确实有,您可以使用它来减少确认,与 SMLCmessagesPerAck相同,但它不能与事务一起使用 - 每个消息都在单独的事务中传递和确认)。batchSize
- consumerBatchEnabled:允许在消费者中批量处理批量消息;
- maxConcurrentConsumers以及消费者缩放间隔或触发器——DMLC 中没有自动缩放功能。但是,它确实允许您以编程方式更改consumersPerQueue属性,并且消费者也会相应地进行调整。
然而,与 SMLC 相比,DMLC 具有以下优点:
- 在运行时添加和删除队列的效率更高。使用SMLC,整个消费者线程被重新启动(所有消费者被取消并重新创建)。通过 DMLC,不受影响的消费者不会被取消。
- 避免了 RabbitMQ 客户端线程和消费者线程之间的上下文切换。
- 线程在消费者之间共享,而不是为 SMLC 中的每个消费者拥有专用线程
并发配置
消费者数量配置
图中consumer数量可进行配置
simple配置
listener:simple:concurrency:50maxConcurrency:100
支持可伸缩的配置,根据前面的线程模型,我们知道simple配置的并发数实际上也是消费线程的数量。
direct配置
direct:consumersPerQueue:50#每个队列消费者数量
根据前面的线程模型,使用direct模式需要设置合理的连接线程池,因为连接线程池还需要进行业务逻辑的处理,配置如下
@Bean(name ="connectionFactory")@PrimarypublicConnectionFactoryconnectionFactory(@Value("${spring.rabbitmq.host}")String host,@Value("${spring.rabbitmq.port}")int port,@Value("${spring.rabbitmq.username}")String username,@Value("${spring.rabbitmq.password}")String password){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setExecutor(createThreadPool(coreSize,
maxSize,"mq-connection-","mq-connection-group"));return connectionFactory;}
配置后,rabbitMq控制台有会体现出来
批量消费
首先要明白批量消费的意义,消费者可以批处理,还可以批量确认。减少ack次数
simple配置
listener:simple:batch-size:10acknowledge-mode: auto
consumer-batch-enabled:true
配置批量消费
@PrimarypublicSimpleRabbitListenerContainerFactorynormalFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("connectionFactory")ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
factory.setBatchListener(true);
configurer.configure(factory, connectionFactory);return factory;}
监听类
@RabbitListener(queues = "QUEUE_DEMO_DIRECT")
public void ListenerQueue01(List<Message> message, Channel channel) throws IOException, InterruptedException {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
这样在消息足够的情况下
List中就是10条消息。但是只需要确认最后一条消息就好了
原理
rabbitmq支持批量确认
channel.basicAck(deliveryTag,multiple)
在RabbitMQ中,channel.basicAck方法用于确认已经接收并处理了消息。该方法有两个参数:
- deliveryTag:表示消息的唯一标识。每个消息都有一个唯一的deliveryTag,用于标识消息在channel中的顺序。当消费者接收到消息后,需要调用channel.basicAck方法并传递deliveryTag来确认消息的处理。 2、 multiple:表示是否批量确认消息。当multiple为false时,只确认当前deliveryTag对应的消息;当multiple为true时,会确认当前deliveryTag及之前所有未确认的消息。
direct配置
看过前面我们知道DirectMessageListenerContainer并不像SimpleMessageListenerContainer能够支持批量消息,但是其支持一个参数
messagesPerAck,也可以在处理多少个消息之后进行ack,减少ack次数。
流量控制
首先要了解rabbitMq的channel.basicQos方法
/**
* Request a specific prefetchCount "quality of service" settings
* for this channel.
* <p>
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
*
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
* @see #basicQos(int, int, boolean)
*/voidbasicQos(int prefetchCount,boolean global)throwsIOException;
pprefetchCount,服务器一次请求将传递的最大消息数,如果没有限制,则为0。调用此方法时,该值必填。默认值:0,简单说就是一个消费者的最大处理消息数,broker服务器发现一个消费者还有prefetchCount个消息未ack,那么就不会再给它发送消息。防止消息堆积
global,是否将设置应用于整个频道,而不是每个消费者
- 默认值:false,应用于本身(一个消费者)
- true:应用于整个频道
direct和simple配置字段相同都是prefetchCount
注意在simple模式下如果prefetchCount配置小于batchSize,那么prefetchCount就会被batchSize覆盖。
如果在direct模式下prefetchCount配置小于messagesPerAck,那么prefetchCount就会被messagesPerAck覆盖。
重试配置
rabbitMq的消费端的重试机制指的是本地重试。
spring:
rabbitmq:
listener:
simple:
retry:
enabled:true # 开启消费者失败重试
initial-interval:1000 # 初识的失败等待时长为1秒
multiplier:1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts:3 # 最大重试次数
stateless:true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,默认消息会被丢弃
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
版权归原作者 氵奄不死的鱼 所有, 如有侵权,请联系我们删除。