0


Rabbitmq入门与应用(六)-rabbitmq的消息确认机制

rabbitmq的消息确认机制

确认消息是否发送给交换机

配置
server:port:11111spring:rabbitmq:port:5672host: 192.168.201.81
    username: admin
    password:123publisher-confirm-type: correlated
编码RabbitTemplate.ConfirmCallback

ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

在配置类中编码确认回调函数。tips: 设置 rabbitTemplate.setMandatory(true);

配置类

rabbitTemplate.setConfirmCallback(ConfirmCallback confirmCallback);

CorrelationData:

1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑。

@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);
    log.debug("Rabbitmq配置启动成功,RabbitTemplate:{}设置完成",rabbitTemplate);
    rabbitTemplate.setMessageConverter(messageConverter());
    rabbitTemplate.setConfirmCallback(newRabbitConfirmCallbackImpl());return rabbitTemplate;}/**
 * 确保消息是否发送到交换机
 */classRabbitConfirmCallbackImplimplementsRabbitTemplate.ConfirmCallback{@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
         log.warn("****Exchange callback-检验是否发送成功********");
         log.warn("correlationData->相关数据:{}",correlationData);
         log.warn("ack->Exchange响应:{}",ack);
         log.warn("cause->错误原因:{}",cause);}}
测试发送

测试向交换机发送数据,测试交换机是否成功收到。

假设给一个错误的Exchange
@ServicepublicclassMqServiceImplimplementsIMqService{@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicvoidsendMessage(String msg){//错误的Exchange名称,实际名称为:ssc_sc_routing_exchangefinalStringEXCHANGE="ssc_sc_routing_exchangex";finalStringROUTING_KEY="ssc_sc_routing_key";

        rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,
                msg
        );}}

image-20231218164926379

如果Exchange正确
@OverridepublicvoidsendMessage(String msg){finalStringEXCHANGE="ssc_sc_routing_exchange";finalStringROUTING_KEY="ssc_sc_routing_key";
    rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,
            msg
    );}

image-20231218164425398

确认消息是否从交换机发送到队列RabbitTemplate.ReturnsCallback

设置ResturnsCallback

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。

配置文件
spring:rabbitmq:publisher-confirm-type: correlated
    publisher-returns:true#检查是否绑定到队列中
配置
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(newRabbitConfirmReturnCallbackImpl());classRabbitConfirmReturnCallbackImplimplementsRabbitTemplate.ReturnsCallback{@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
       log.warn("message:{}",returnedMessage.getMessage());
       log.warn("exchange:{}",returnedMessage.getExchange());
       log.warn("replyCode:{}",returnedMessage.getReplyCode());
       log.warn("replyText:{}",returnedMessage.getReplyText());
       log.warn("routingKey:{}",returnedMessage.getRoutingKey());}}
测试

修改routingkey的值,让交换机不能路由到指定Queue。

packagecom.wnhz.ssc.cloud.mq.service.impl;importcom.wnhz.ssc.cloud.mq.service.IMqService;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;@ServicepublicclassMqServiceImplimplementsIMqService{@AutowiredprivateRabbitTemplate rabbitTemplate;@OverridepublicvoidsendMessage(String msg){finalStringEXCHANGE="ssc_sc_routing_exchange";//修改routingkey,给一个错误的值,正确值为: ssc_sc_routing_keyfinalStringROUTING_KEY="ssc_sc_routing_keyx";
        rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,
                msg
        );}}

image-20231218165907610

返回message:

message:(Body:'"hello confirm call back"'
    MessageProperties
    [
        headers={
                __TypeId__=java.lang.String
                 },
        contentType=application/json,
        contentEncoding=UTF-8,
        contentLength=0,
        receivedDeliveryMode=PERSISTENT,
        priority=0,
        deliveryTag=0])

消费确认信息

消费监听模式
  • Simple模式image-20231218155921090Simple模式即SMLC。simple模式每个消费者都有其私有的线程,可以增加消费者,也会自动增加消费线程,不管消费者是不是在处理消息,可能会造成资源线程的浪费。 对每个消费者使用一个内部队列和一个专用线程。如果容器配置为侦听多个队列,则使用同一个消费者线程来处理所有队列。并发控制由concurrentConsumers和其他属性。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们传递给消费者线程。
  • Direct模式image-20231218160106458压力集中在Connection线程池上,线程可以复用与多个消费者,但是如果采用这种模式,需要设置Connection线程池合适的参数。
Message对象结构

Message对象的结构,

消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

image-20231218173406699
image-20231218173218649

消息确认方式
  1. AcknowledgeMode.AUTO:自动确认。
  2. AcknowledgeMode.NONE:根据情况确认。
  3. AcknowledgeMode.MANUAL:手动确认。

direct模式:
image-20231218173612719
simple模式:
image-20231218173833493

消费端监听发送
@RabbitListener(queues ="data_confirm_queue")@OverridepublicvoidreceiveBookFromMq(Message message,Channel channel,Book book){

    log.debug("message:{}", message);
    log.debug("message.getMessageProperties().getHeaders()===>{}",
            message.getMessageProperties().getHeaders());
    log.debug("[order消费者:]接收到消息: {}", book);try{
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        log.debug("消息队列确认: {},{}",
                message.getMessageProperties().getConsumerQueue(),"接收到回调方法");}catch(IOException e){
        e.printStackTrace();}}
手动确认方式
  1. Basic.Ack 命令:用于确认当前消息。
  2. Basic.Nack 命令:用于否定当前消息(注意:这是AMQP 0-9-1的RabbitMQ扩展) 。
  3. Basic.Reject 命令:用于拒绝当前消息
channel.basicAck(long deliveryTag,boolean multiple)

basicAck 方法用于确认当前消息。

publicvoidbasicAck(long deliveryTag,boolean multiple)throwsIOException{this.delegate.basicAck(deliveryTag, multiple);}
  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel。
  • multiple:为了减少网络流量,手动确认可以被批处理。- true: 代表批量应答 channel 上未应答的消息,比当前tag小的未应答的也一并应答(如5,6,7未应答)。image-20240221084206673- false: 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答image-20240221084249669
basicNack

basicNack 方法用于否定当前消息。 由于 basicReject 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现

publicvoidbasicNack(long deliveryTag,boolean multiple,boolean requeue)throwsIOException{this.delegate.basicNack(deliveryTag, multiple, requeue);}
basicReject(long deliveryTag, boolean requeue)

basicNack 方法用于否定当前消息。basicReject 方法用于明确拒绝当前的消息而不是确认。

publicvoidbasicReject(long deliveryTag,boolean requeue)throwsIOException{this.delegate.basicReject(deliveryTag, requeue);}

消息遗弃或入队,一般建议消息丢弃重新发。

  • requeue: true :重回队列,false :丢弃,我们在nack方法中必须设置 false,否则重发没有意义。
出现异常的解决方案
packagecom.wnhz.mq.order.service.impl;importcom.rabbitmq.client.Channel;importcom.wnhz.domain.Book;importcom.wnhz.mq.order.service.IOrderService;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Service;importjava.io.IOException;@Slf4j@ServicepublicclassOrderServiceImplimplementsIOrderService{privatevoidbuildException(){thrownewRuntimeException("[消费者:] 消费出现异常......");}@RabbitListener(queues ="data_confirm_queue")@OverridepublicvoidreceiveBookFromMq(Message message,Channel channel,Book book){try{//制造异常测试buildException();
            log.debug("message:{}", message);
            log.debug("message.getMessageProperties().getHeaders()===>{}",
                    message.getMessageProperties().getHeaders());
            log.debug("[order消费者:]接收到消息: {}", book);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

            log.debug("消息队列确认: {},{}",
                    message.getMessageProperties().getConsumerQueue(),"接收到回调方法");}catch(Exception e){
          log.debug("消费异常: {}",e.getMessage());try{
                log.debug("尝试丢弃:{}消息.....................",book);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException ex){
                ex.printStackTrace();}}}}
标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/qq_36115196/article/details/136167472
版权归原作者 自信人间三百年 所有, 如有侵权,请联系我们删除。

“Rabbitmq入门与应用(六)-rabbitmq的消息确认机制”的评论:

还没有评论