0


RabbitMQ不公平分发和预取值(channel.basicQos)

Qos(Quality of Service,服务质量)概念:

当网络发生拥塞的时候,所有的数据流都有可能被丢弃;为满足用户对不同应用不同服务质量的要求,就需要网络能根据用户的要求分配和调度资源,对不同的数据流提供不同的服务质量:对实时性强且重要的数据报文优先处理;对于实时性不强的普通数据报文,提供较低的处理优先级,网络拥塞时甚至丢弃。QoS应运而生。支持QoS功能的设备,能够提供传输品质服务;针对某种类别的数据流,可以为它赋予某个级别的传输优先级,来标识它的相对重要性,并使用设备所提供的各种优先级转发策略、拥塞避免等机制为这些数据流提供特殊的传输服务。配置了QoS的网络环境,增加了网络性能的可预知性,并能够有效地分配网络带宽,更加合理地利用网络资源。

为什么要设置Qos:

在RabbitMQ中,队列向消费者发送消息,如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积,因为这些消息未收到消费者发送的ack,所以只能暂时存储在缓存区中,等待ack,然后删除对应消息。这样的话,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

Qos的取值问题:

在传输效率和消费者消费速度之间做一个平衡。这个值是需要不断尝试的,因为太低,信道传输消息效率太低,如果太高,消费者来不及确认消息导致消息积累问题,内存消耗不断增大。

不公平分发

概念:
如果采用默认消息分发策略,消息是轮询发送的。但是消费者之前存在处理快慢问题,如果A处理慢,B处理快,他们接受同样数量的消息显然是不合理的。
引出不公平分发:
就是在这样情况下,不公平分发出现了,简而言之就是能者多劳,处理快的多处理,处理慢的少处理。
如何实现不公平分发:
那么如何实现呢?上面介绍了basicQos,如果我们将qos的值设为1,那么你想一想会出现什么情况呢?信道中只允许传输一条消息,那么当这条消息处理完后,队列会立马发送下一条消息,所以这个时候快的不断处理,慢的等待当前处理完再处理下一条。这样就实现了能者多劳。
在这里插入图片描述
在这里插入图片描述

代码实现:
生产者:

publicclassProducer{publicstaticfinalString QUEUE_NAME ="test_basic_qos";publicstaticfinalString EXCHANGE_NAME ="test_basic_qos";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"basic.qos");Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();System.out.println("发送消息为:"+ message);
            channel.basicPublish(EXCHANGE_NAME,"basic.qos",null,message.getBytes(StandardCharsets.UTF_8));}}}

C1(快的消费者):

publicclassConsumer02{publicstaticfinalString QUEUE_NAME ="test_basic_qos";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag, message)->{try{SleepUtils.sleep(2);}catch(InterruptedException e){
                e.printStackTrace();}System.out.println("高性能服务器接受:"+newString(message.getBody()));
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = consumerTag ->{};
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}}

C2(慢的消费者):

publicclassConsumer01{publicstaticfinalString QUEUE_NAME ="test_basic_qos";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
        channel.basicQos(1);DeliverCallback deliverCallback =(consumerTag,message)->{try{SleepUtils.sleep(30);}catch(InterruptedException e){
                e.printStackTrace();}System.out.println("低性能服务器接受:"+newString(message.getBody()));
            channel.basicAck(message.getEnvelope().getDeliveryTag(),true);};CancelCallback cancelCallback = consumerTag ->{};
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}}

结果:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

预取值

概念:
设置消费者信道最大传输信息数。
测试:
我们将慢的消费者preCount取值为5,快的消费者预取值为2,然后发送7条消息(为了保证快的消费者只处理2条,我们要在2s内能发送7条数据,这样保证后面的消息全部发送给慢的消费者,避免快的消费者处理完了消息,又将发送后续消息。)
代码:
参考上面代码,只是修改了qos值。
结果:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
分析:
因为快的消费者信道满了,不能再发送消息,所以消息只能发送给慢的服务器,这就是basicQos用法。

标签: rabbitmq 网络 java

本文转载自: https://blog.csdn.net/ACMjiayou/article/details/122910159
版权归原作者 橙留香_ 所有, 如有侵权,请联系我们删除。

“RabbitMQ不公平分发和预取值(channel.basicQos)”的评论:

还没有评论