0


微服务RabbitMQ高级篇

一.消息可靠性传递概述

生产者发送消息到交换机,交换机将消息路由到队列,消费者从队列获取消息。哪些环节会导致消息丢失。

1.生产者发送消息丢失

  • 生产者没有将消息发送到交换机
  • 交换机没有成功将消息路由到队列

2.MQ宕机导致消息丢失

3.消费者处理消息丢失

消费者获取到消息后,未来得及处理,宕机

消费者获取到消息后,处理消息抛异常。

二.生产者消息确认机制

生产者消息确认机制一共有2种方式

  • publisher-comfirm
  • publisher-return

在publisher这个微服务的application.yml中添加配置

spring:  
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true 
      template:
        mandatory: true

1.publish-confirm-type:

开启publisher-confirm,这里支持两种类型:

  • simple:同步等待confirm结果,直到超时
  • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

2.publish-returns:

开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

3.template.mandatory:

定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

三.publisher-comfirm

作用:开启后,生产者发送消息到RabbitMQ交换机,RabbitMQ会进行结果返回。

  • ack:生产者成功将消息发送到队列
  • nack:生产者发送到交换机失败

如何使用

1.在生产者配置文件中开启

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 异步回调

2.如何接受RabbitMQ结果返回

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                
            }
        });

correlationData:消息的唯一标识

ack

  • true:生产者成功将消息发送到交换机
  • false:生产者发送到交换机失败

cause:失败原因

注意点:rabbitTemplate.setConfirmCallback方法的调用,只能调用一次。

3.如何保证rabbitTemplate.setConfirmCallback方法只会被调用一次

方案1:初始化方法

@PostConstruct

public void init() {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
         @Override
         public void confirm(CorrelationData correlationData, boolean ack, String cause) {

            }
        });
    }

方案2:CommandLineRunner(推荐)

@Component
public class MyComandLineRunner implements CommandLineRunner {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void run(String... args) throws Exception {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                
            }
        });
    }
}

方案3:实现ApplicationContextAware实现类

@Component
public class MyApplicationContext implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                
            }
        });
    }
}

4.发送消息的时候,需要给消息指定一个唯一标识

rabbitTemplate.convertAndSend(exchangeName,routingKey,message,new CorrelationData(id));

5.逻辑问题:如果生产者发送消息到交换机失败了?怎么重发

  • 在发送消息之前,将消息先存储到数据库(MySQL,Redis)
  • 如果消息发送交换机失败,读取Redis中信息,重新发送

6.测试

1.成功向交换机发送消息:观察

  • correlationData
  • ack
  • cause

2.发送消息到交换机失败:删除交换机

四.publisher-return

作用:开启后,交换机将消息路由到消息队列失败,RabbitMQ会进行结果返回。

如何使用

1.在生产者配置文件中开启

spring:
  rabbitmq:
    publisher-returns: true
      template:
        mandatory: true

template:mandatory: true定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

2.如何接受RabbitMQ结果返回

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
  • message:交换机路由队列失败的那个消息
  • replyCode:错误码
  • replyText:错误信息
  • exchange:交换机
  • routingKey:路由key

3.注意点

rabbitTemplate.setReturnCallback方法的调用,只能调用一次。

方案1:初始化方法

@PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
    }

方案2:

@Component
public class MyComandLineRunner implements CommandLineRunner {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void run(String... args) throws Exception {
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
    }
}

方案3:实现ApplicationContextAware实现类

@Component
public class MyApplicationContext implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
         rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                
            }
        });
    }
}

4.逻辑问题:交换机路由消息到队列失败了,如何重新发送

直接调用rabbitTemplate.convertAndSet发送

5.测试

将路由key故意改错

  • message:交换机路由队列失败的那个消息
  • replyCode:错误码
  • replyText:错误信息
  • exchange:交换机
  • routingKey:路由key

五.消息持久化

交换机、队列、消息持久化(都默认持久化)

交换机

ExchangeBuilder.directExchange(ITCAST_DIRECT ).durable(true).build();
new DirectExchange(ITCAST_DIRECT,true,false); 

队列

QueueBuilder.durable(DIRECT_QUEUE1).build()
new Queue(DIRECT_QUEUE2,true);

消息持久化

1.如果发送普通字符串,默认持久化

2.如果期望消息不持久化。

Message msg = MessageBuilder.withBody(message.getBytes("utf-8"))
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                            .build();

六.消费者消息确认机制

解决问题:

  • 消费者处理消息丢失
  • 消费者获取到消息后,未来得及处理,宕机
  • 消费者获取到消息后,处理消息抛异常。

1.开启消费者消息确认机制

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto
  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

作用:消费者获取到消息后,如果处理消息出现异常,会给MQ返回nack. MQ将消息放入队列头部再给消费者。
消费者获取到消息后,如果处理消息正常,会给MQ返回ack. MQ将消息从消息队列中删除。

2.消费者消息确认机制-问题

如果消费者代码写的有问题

无限重试,导致MQ压力过大

3.开启消费者消息重试机制

优势

1.重试在消费者本地重试。

2.重试可以有延迟时间。

3.重试有次数限制

如何使用

rabbitmq:
    listener:
      simple:
        retry:
          enabled: true  #开启失败重试
          initial-interval: 100 # 初次失败,间隔时间
          multiplier: 2 # 间隔时间倍数
          max-attempts: 3 #最大重试次数
          stateless: true #是否是无状态,true无状态,和事务相关,有事务写false

重试耗尽

触发重试耗尽策略

MessageRecover

RejectAndDonotMessageRecover(默认)重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediaRequeueMessageRecover重试耗尽后,返回nack,消息重新入队

RepublishMessageRecover

1.创建错误交换机

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");}

2.错误队列

@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true); }

3.绑定

@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}

4.RepublishMessageRecover交由spring管理,进行重发。

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }

5.重试耗尽后,将失败消息投递到指定的交换机

七.如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

八.死信交换机

**死信 **

死信满足之一

  • 消息被消费者拒绝,不让重新入队
  • 消息队列满了,溢出的消息。
  • 消息在消息队列中超时过期

去哪里?

  • 被丢弃
  • 如果队列指定了死信交换机。

死信交换机

普通交换机

怎么给队列指定死信交换机

  • 给队列设置dead-letter-exchange属性,指定一个交换机
  • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

死信交换机 + 消息ttl实现延迟消息队列

延迟消息队列

生产者----->消息,消费者不能立即消费,需要等待一定时间才能消费。

如何实现

给消息设置ttl有2种方式

1.创建队列设置消息过期时间 ttl() x-message-ttl

2.创建消息的时候可以指定过期时间

Message msg = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
                .setContentType("text/plain")
                .setExpiration("5000").build();

实现方式

我们声明消费者一组死信交换机和队列,基于注解方式:

@RabbitListener(bindings = @QueueBinding(       
                value = @Queue(name = "dl.queue", durable = "true"),
                exchange = @Exchange(name = "dl.direct"),      
                key = "dl"))
public void listenDlQueue(String msg){ log.info("接收到 dl.queue的延迟消息:{}", msg);}

消费者config中要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public DirectExchange ttlExchange(){   
    return new DirectExchange("ttl.direct");
}

@Bean
public Queue ttlQueue(){   
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
          .ttl(10000) // 设置队列的超时时间,10秒       
          .deadLetterExchange("dl.direct") // 指定死信交换机
          .deadLetterRoutingKey("dl") // 指定死信RoutingKey 
          .build();}

@Bean  public Binding simpleBinding(){

return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}

发送消息时,给消息本身设置超时时间

@Test  public void testTTLMsg() {   // 创建消息

Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
                                .setExpiration("5000") 
                                .build();  // 消息ID,需要封装到CorrelationData中

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 发送消息

rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);}

如何实现发送一个消息20秒后消费者才收到消息?

  1. 给消息的目标队列指定死信交换机
  2. 消费者监听与死信交换机绑定的队列
  3. 发送消息时给消息设置ttl为20秒

九.延迟队列

使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

延迟队列插件的使用步骤包括哪些?

  1. 声明一个交换机,添加delayed属性为true
  2. 发送消息时,添加x-delay头,值为超时时间

安装DelayExchange插件

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。

1.基于注解方式:

2.基于java代码的方式:

然后我们向这个delay为true的交换机中发送消息,一定要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒:

十.惰性队列

消息堆积问题

  • 生产者>消费者消费速度
  • 如果消息堆积超过队列容量上限,溢出的消息就会称为死信。死信会被丢弃。

怎么解决

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积(使用惰性队列),提高堆积上限

惰性队列特点

  • 将消息直接存入磁盘,不存储内存
  • 支持海量消息存储
  • 消费者要获取消息,MQ将消息加载到内容。

创建

  • lazy()
  • 注解
  • 管理控制台

优点

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

缺点

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

十一.MQ集群

普通集群(分布式)

  1. 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  2. 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  3. 队列所在节点宕机,队列中的消息就会丢失

镜像集群(主从)数据存在延迟

1.主从架构集群,队列可以在多个节点上有

2.主节点:在那个节点上创建队列,那个节点就是主节点

3.镜像节点:备份主节点上队列的节点

4.创建备份策略:
exactly
all
nodes

5.创建队列,根据队列名称,指定那些节点作为镜像节点。

仲裁队列代替镜像集群

1.生产者----->主节点队列------->镜像节点队列

2.与镜像队列一样,都是主从模式,支持主从数据同步

3.使用非常简单,没有复杂的配置

4.主从同步基于Raft协议,强一致

5.创建队列

    指定类型quorum

    java代码 quorum();

    默认5个镜像节点

java代码怎么操作集群

和单机区别;

spring:
  rabbitmq:
    addresses: 192.168.200.128:8071,192.168.200.128:8072,192.168.200.128:8073
    username: itcast
    password: 123
    virtual-host: /


本文转载自: https://blog.csdn.net/weixin_48605536/article/details/136122515
版权归原作者 小毅西 所有, 如有侵权,请联系我们删除。

“微服务RabbitMQ高级篇”的评论:

还没有评论