0


rabbitMq确认机制之ConfirmType

配置方式

@Bean(name ="connectionFactory")@PrimarypublicConnectionFactorynormalConnectionFactory(@Value("${spring.rabbitmq.username}")String username,@Value("${spring.rabbitmq.password}")String password,@Value("${spring.rabbitmq.addresses}")String address){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
 
        connectionFactory.setAddresses(address);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);//   connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setExecutor(createThreadPool(10,20,"mq-connection-","mq-connection-group"));return connectionFactory;}

或者配置文件里配置

spring:
  # RabbitMQ 配置项,对应 RabbitProperties 配置类
  rabbitmq:
    publisher-confirm-type: correlated

publisher-confirm-type属性有三个可选值:

  1. none(默认):关闭发布确认模式。
  2. correlated:消息从生产者发送到交换机后触发回调方法。
  3. simple:会触发回调方法,相当于单个确认(发一条确认一条)。在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker。

使用方式

SIMPLE模式

开启simple模式需要在invoke方法中一起执行 rabbitTemplate.waitForConfirms
同时也会收到回调,回调后结束阻塞,同时可以获取到返回结果。

RabbitTemplate.ConfirmCallback confirmCallback =newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){if(ack){System.out.println(correlationData.toString()+"发送成功");}else{System.out.println(correlationData.toString()+"发送失败, 原因: "+ cause);}}};
        rabbitTemplate.setConfirmCallback(confirmCallback);Boolean invoke = rabbitTemplate.invoke(operations ->{
            rabbitTemplate.convertAndSend("direct_exchange","ROUTING_KEY_01", message, correlationData);return rabbitTemplate.waitForConfirms(1000l);});

CORRELATED模式

RabbitTemplate.ConfirmCallback confirmCallback =newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){if(ack){System.out.println(correlationData.toString()+"发送成功");}else{System.out.println(correlationData.toString()+"发送失败, 原因: "+ cause);}}};
        rabbitTemplate.setConfirmCallback(confirmCallback);
    
        rabbitTemplate.convertAndSend("direct_exchange","ROUTING_KEY_01", message, correlationData);//    correlationData.getFuture().get();sleep(1000*60);System.out.println("发送消息boot mq hello Direct成功");

实现通过callback实现保证消息发送成功。

可以看出来,在开启publisher-confirm的情况下,如果不自行实现ConfirmCallback的逻辑,也无法做到保证消息成功发送。

可以在发送消息时更新为发送中。
收到callback更新为发送成功,或者发送失败。
对于发送失败的安排重试,可以在消息头加上重试次数记录重试次数,达到指定次数,更新为发送失败。

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/qq_37436172/article/details/134616086
版权归原作者 氵奄不死的鱼 所有, 如有侵权,请联系我们删除。

“rabbitMq确认机制之ConfirmType”的评论:

还没有评论