0


高级篇-rabbitmq的高级特性

1.消息可靠性

三种丢失的情形:

1.1 生产者确认机制

启动MQ

创建Queues:

两种Callback:

1.ReturnCallback:全局callback

2.ComfirmCallback: 发送信息时候设置

 @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";
        // 2.准备CorrelationData
        // 2.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判断结果
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    }

执行成功:

监控页面:

模拟失败:

1.投递到交互机失败

2.投递到交换机了,但是没有进入队列

1.2 消息持久化

注意: 生产者确认只能保证数据放到队列当中,但是无法保证数据不丢失(比如所在的机器宕机了),
所以还需要保证数据的持久化

@Configuration
public class CommonConfig {
    @Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct");
    }
    @Bean
    public Queue simpleQueue(){
        return QueueBuilder.durable("simple.queue").build();
    }
}
@Test
public void testDurableMessage() {
   // 1.准备消息 消息持久化
   Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
   // 2.发送消息
   rabbitTemplate.convertAndSend("simple.queue", message);
}

注意:

    //交换机不传值默认就是持久化  
    //交换机、队列、消息默认都是持久化   
    @Bean
    public DirectExchange simpleDirect(){
        return new DirectExchange("simple.direct");
    }

    public AbstractExchange(String name) {
        this(name, true, false);
    }

演示数据是否默认持久化:

 重启mq:

1. 交互机、队列、消息都做持久化

2.消费者端关闭防止被消费

3.重启mq后看队列中数据是否还在(是否持久化)

1.3 消费者消息确认

生产者确认:能确定消息投递到队列
消息持久化:能避免MQ宕机造成的消息丢失
生产者确认和消息持久化能保证消息能投递到消费者,但是无法保证消息被消费者消费(比如投递消费者的
同时,消费者所在机器宕机了)

1.manual:不推荐 代码侵入
try{
  //业务逻辑
  ack
} catch(ex){
  nack
}
2.auto:推荐 spring全权完成,不需要手动写代码
3.none:不推荐 投递完成立马删除消息,是否成功都不管
@Slf4j
@Component
public class SpringRabbitListener { 
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        //模拟出现异常情况
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }
}

默认为none:抛出异常后消息立即被删除:

修改为auto模式:

队列返回nack会再去发送信息:

1.4 失败重试机制

演示失败重试机制:

listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true
          initial-interval: 1000
          multiplier: 3
          max-attempts: 4

默认重试到达最大次数后消息就丢弃:

   但是对于一些比较重要不能丢弃的消息需要使用以下策略:    ![](https://img-blog.csdnimg.cn/17584e4a42f34937b7219627399e6711.png) 
推荐使用第三种方案:将失败的消息发送到失败的交换机和失败的队列中,后面可以告知管理员然后重新
人工去处理

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

演示:

发送消息:

面试题:最后一分钟的总结

2. 死信交换机

2.1 初识死信交换机

1.发送信息到消费者默认的retry重试机制,达到最大次数就会被reject
2.队列中绑定一个死信交换机,接收被reject的信息,然后发送到dl.queue
3.这样就不担心死信会丢失

对比消息失败信息处理策略:

2.2 TTL

注意: 存活时间取消息所在队列中存货时间 、消息本身存活时间的以短的时间为准

@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg) {
        log.info("消费者接收到了dl.queue的延迟消息");
    }
}

@Configuration
public class TTLMessageConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

    @Test
    public void testTTLMessage() {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setExpiration("5000")
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
        // 3.记录日志
        log.info("消息已经成功发送!");
    }

演示延时队列:

1.启动消费者

2.发送消息:testTTLMessage()

2.3 延迟队列

p159 27:18

@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }
}

    @Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 2.准备CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("发送消息成功");
    }

演示延时队列:

1.启动消费者

2.运行testSendDelayMessage

报错原因:消息没有做路由

如何不报错:添加延迟的判断:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}

标签: mq rabbitmq

本文转载自: https://blog.csdn.net/wanglvip/article/details/128273475
版权归原作者 黑冰vip 所有, 如有侵权,请联系我们删除。

“高级篇-rabbitmq的高级特性”的评论:

还没有评论