0


RabbitMQ高级特性

消息的可靠性

消息的可靠性保证:保证消息在传输过程中不会出现消息丢失的情况,确保发送的消息至少被消费一次。

消息的可靠性问题

消息从生产者生成,到消费者消费,大致可分为三个阶段,这三阶段都有可能出现消息丢失的情况

  • 阶段一中:如果生产者实现代码中的交换机名称填写错误,那么在mq上找不到对应的交换机,发送的消息会出现丢失。
  • 阶段二中:生产者实现代码中的routingKey为“a”,交换机与队列绑定的routingKey为“b”,这时交换机将消息发送到队列时,由于两个key不相等,找不到对应的队列,消息存储失败,丢失
  • 阶段二中:消息默认存储在内存中,在消费者消费之前,如果mq服务器宕机,内存就会释放,消息出现丢失。
  • 阶段三中:消费者消费消息后,会自动给mq服务端返回一个ack标志,然后mq将消息从队列中删除。如果消费者在获取到消息以后,然后在进行业务处理中消费者宕机了,这时这个消息没有被消费,但是由于之前已经返回了ack,所以mq中删除了这个消息,这时消息出现丢失。

保证生产者发送的消息不丢失

生产者确认机制:可以让生产者感知到消息是否正确发送给了交换机,如果消息正常到mq服务端的交换机,那么mq会返回一个ack给生产者表示发送消息接收到了。如果消息没有正常到交换机,那么mq服务端会给生产者一个nack。当返回nack时,在生产者端可重试消息的发送或其它处理。

生产者回退机制:可以让生产者感知到消息是否正确投递给了队列,交换机并不存储消息,消息只有到达队列才算是发送成功。如果交换机投递消息到队列失败了,这时队列会返回一个nack到生产者,在生产者端可重试消息的发送或其它处理。

配置文件中添加确认机制的配置

spring:
  rabbitmq:
    # 解决生产者发送消息到交换机失败的问题
    # 异步回调,MQ返回结果到生产者端时会回调这个ConfirmCallback接口的实现
    publisher-confirm-type: correlated 
    # 解决交换机到队列路由失败的问题,失败则调用ReturnCallback函数
    publisher-returns: true  

自定义RabbitTemplate,实现回调机制,配置确认交换机和队列

@Slf4j
@Configuration
public class RabbitmqConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        /**
         * 接口ConfirmCallback confirmCallback:生产者把消息发送到交换机的结果回调
         * correlationData:可以封装消息的ID,需要在发送消息时传入此参数,这里才能接收到,否则是null
         * boolean ack:消息发送的结果状态,发送成功是true,失败是false
         * String cause:发送失败的描述信息,如果发送成功是null。
         */
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            if (ack) {
                // ack 消息发送到交换机成功
                log.info("消息发送到交换机成功, ID:{}", correlationData.getId());
            } else {
                // nack 消息发送到交换机失败
                log.info("消息发送到交换机失败, ID:{}, 原因{}",correlationData.getId(), cause);
                /**
                 * 消息发送失败后 --> 根据业务具体分析,然后处理
                 * 比如:所有要发送的消息在发送前都会记录在数据库的一张表里
                 *      记录字段包括:消息状态表示是否发送成功,消息ID,消息重试次数,消息内容等
                 *      发送失败后在这里可以根据ID从数据库重新获取到这条消息,然后如果重试次数为3,
                 *      那么这里就重试发送3次,如果中间有一次成功了,在成功逻辑里将状态改为成功,
                 *      或是超过3次,还是失败则不再重试。
                 */
            }
        });
        //定义消息从交换机路由到队列时失败的策略。true,则调用ReturnCallback;false:则直接丢弃消息
        rabbitTemplate.setMandatory(true);
        //解决交换机到队列路由失败的问题,失败则调用ReturnCallback函数
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            log.info("交换机发送消息到队列失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
        return rabbitTemplate;
    }

    @Bean
    public DirectExchange confirmDirectExchange(){
        return new DirectExchange("confirm_direct_exchange");
    }

    @Bean
    public Queue confirmDirectQueue(){
        return new Queue("confirm_direct_queue");
    }
    @Bean
    public Binding bindingConfirmQueue(){
        return BindingBuilder.bind(confirmDirectQueue())
                             .to(confirmDirectExchange())
                             .with("info_confirm");
    }
}

生产者实现代码

@SpringBootTest
public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirmDirectExchange() {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "hello confirm !";
        //输出:消息发送到交换机成功, ID:4d435bc4-0c92-4f59-91ba-a592cc12af40
        //rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm",message,correlationData);
        /**
         * 消息发送到交换机失败, ID:a076d4bf-ae3d-4c3b-ac22-4a38d5095287,
         * 原因channel error; protocol method: #method<channel.close>(reply-code=404,
         * reply-text=NOT_FOUND - no exchange 'confirm_direct_exchange_1' in vhost '/',
         * class-id=60, method-id=40)
         * 指在虚拟主机'/'中,没有交换机'confirm_direct_exchange_1'
         */
        //rabbitTemplate.convertAndSend("confirm_direct_exchange_1","info_confirm",message,correlationData);
        /**
         * 消息发送到交换机成功, ID:3dc943df-c2cf-4562-937b-1485764413c5
         * 交换机发送消息到队列失败,应答码312,原因NO_ROUTE,交换机confirm_direct_exchange,
         * 路由键info_confirm_1,消息(Body:'hello confirm !' ......)
         */
        rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm_1",message,correlationData);
    }

}

消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

对交换机,队列,消息三者持久化。

交换机与队列持久化

SpringAMQP中声明的队列和交换机默认都是持久化的

@Bean
public DirectExchange confirmDirectExchange(){
   return new DirectExchange("confirm_direct_exchange");
}

如上交换机默认持久化,也可以选择多参数的构造方法自己设置,队列也是同样的设置。

消息的持久化

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:持久化和非持久化,代码如下:

@Test
public void testDurableMessage() {
    Message message = MessageBuilder.withBody("hello,durable,message"
                                    .getBytes(StandardCharsets.UTF_8))
                                    //消息持久化模式
                                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                                    .build();
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("confirm_direct_exchange","info_confirm",message,correlationData);
}

所有发送的消息都会转为Message类型,底层默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意创建设置。

保证消费者消费的消息不丢失

消费者与mq之间的消息丢失问题描述:

RabbitMQ投递消息给消费者 ---> 消费者获取消息后,返回ack给RabbitMQ ---> RabbitMQ删除消息 ---> 消费者宕机,消息尚未处理,消息丢失。

所以消费者返回ack的时机非常重要,如果能等到处理完消息后再返回ack,消息就不会丢失。

SpringAMQP则允许配置三种消息确认模式:

  • none:mq假定消费者获取消息后会成功处理,因此消息投递后立即被删除
  • auto:由spring监测listener代码是否出现异常,没有异常则返回ack,删除队列消息;抛出异常则返回nack,不会将队列的消息删除。
  • manual:手动实现,需要在业务代码结束后,调用api发送ack。

none模式

yml文件配置新增

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none 

模拟一个消息处理异常

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
    log.info("消费者接收到simple.queue的消息:{}", msg);
    // 模拟异常
    System.out.println(1 / 0);
    log.debug("消息处理完成!");
}

测试可以发现,当消息处理抛异常时,消息依然被mq删除了。

auto模式(默认模式)

yml配置新增(或者不配置,默认就是auto模式)

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

测试,抛出异常后,mq上消息没有删除

当消费者出现异常后,消息会不断(重入队)到队列,再重新发送给消费者,然后再次异常,再次入队到队列,无限循环,导致mq的消息处理飙升,带来不必要的压力:

可以使用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的到mq队列。

application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒
          max-attempts: 3 # 最大重试次数

重新启动消费者服务,消费者会消费之前因错误未正常消费的消息。因为设置了重试机制,所以在重试完后,不会在循环发送消息,mq上会直接删除这条消息。所以重试达到最大次数后,Spring会返回ack,消息会被丢弃。 最后抛出重试耗尽的异常。

在以上的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

那么如果不想丢失这些发送失败的消息,而需要另做处理的话,可以通过MessageRecovery接口来实现。将失败后的消息投递到一个指定的,专门存放异常消息的队列,后续由人员自由处理。

实现代码如下:

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("errorMessageExchange");
}
@Bean
public Queue errorMessageQueue(){
    return new Queue("errorMessageQueue");
}
@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorMessageQueue())
                         .to(errorMessageExchange())
                         .with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    //重试耗尽后,将失败消息投递到指定的交换机
    return new RepublishMessageRecoverer(rabbitTemplate, "errorMessageExchange", "error");
}

测试,可以看到所有因异常导致消息处理错误,并重试完最大次数后还是失败的消息,都发送到了这个专门处理错误的队列里,这样即使是重试后仍然失败的消息也不会丢失。

消费端限流问题

实际项目中,可能在mq中堆积了成千上万的消息,如果不进行限流,当我们打开消费端时,这些消息都会一下子过来,造成服务器的宕机,所以需要进行消费者限流处理。

Rabbitmq提供了一种QoS(服务质量保证功能),来应对这种巨量的消息瞬间全部喷涌推送过来,通过Rabbitmq控制消费端的消费速度,进行必要的限流。

在yml配置中新增配置如下:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 开启手动ack
        prefetch: 3 # 每次只能获取3条消息,处理完成才能获取下次的3个消息
        concurrency: 1 # 消费者最小数量为1
        max-concurrency: 10 # 消费者最大数量为10

消费者端代码实现,新增两个消费者

@RabbitListener(bindings = @QueueBinding(
        value = @Queue("current_limit_queue"),
        exchange = @Exchange(value = "current_limit_exchange",type = "topic"),
        key = {"current.limit.#"}
))
/**
 * message:封装了消息的相关信息,包括消息ID
 * channel:表示信道,封装Rabbitmq通信的相关消息的配置信息,如果当前消息被成功消费,
 *          通过信道进行标记(已消费),Rabbitmq获取到响应ack的确认信息。
 */
public void currentLimitConsumer1(Message message, Channel channel) throws Exception {
    //模拟业务耗时3秒
    TimeUnit.SECONDS.sleep(3);
    //消息的唯一标识
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    //手动确认消息是否接收,通过消息的id来指定这条消息被成功处理了,true表示这条消息被成功消费
    channel.basicAck(deliveryTag,true);
    log.info("消费者currentLimitConsumer1接收到消息:{}", new String(message.getBody()));
}

@RabbitListener(bindings = @QueueBinding(
        value = @Queue("current_limit_queue"),
        exchange = @Exchange(value = "current_limit_exchange",type = "topic"),
        key = {"current.limit.#"}
))
public void currentLimitConsumer2(Message message, Channel channel) throws Exception {
    TimeUnit.SECONDS.sleep(3);
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    channel.basicAck(deliveryTag,true);
    log.info("消费者currentLimitConsumer2接收到消息:{}", new String(message.getBody()));
}

生产者端代码实现

@Test
public void testMessageLimitPushlisher() {
    String exchange = "current_limit_exchange";
    String key = "current.limit.key";
    for (int i = 1; i <= 1000; i++) {
        String message = "消息限流实现,这是第【" + i + "】条消息";
        rabbitTemplate.convertAndSend(exchange,key,message);
    }
}

测试:先启动消费者监听队列消息,然后启动生产者发送消息。限流效果如下:

Ready:表示处于队列中等待被消费者消费的消息数量。

Unacked:表示已被消费者取出但还未确认的消息数量。

死信交换机

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false,表示消费失败的消息不重新加入队列,这样这个消息可以成为死信。
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了**

dead-letter-exchange

**属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(DLX)。

大致流程如下:

给队列绑定一个死信交换机,这样这个队列如果出现死信的消息,就将这个消息发送到死信交换机

  • 给队列设置dead-letter-exchange属性,指定一个交换机
  • 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

配置文件中,配置ack模式acknowledge-mode=auto,配置retry重试机制。

定义普通队列,死信交换机,死信队列

@Bean
public Queue commonQueue(){
    return QueueBuilder.durable("common.queue")
            .deadLetterExchange("dl.direct") // 指定死信交换机
            .deadLetterRoutingKey("dl_key")
            .build();
}
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct");
}
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue");
}
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl_key");
}

监听common.queue队列,由于有异常,因为模式为auto,设置了重试机制,所以消费失败后会按设置的重试次数去重试,重试完还是失败,这时这个消息会成为死信,因为这个队列绑定了死信交换机,所以这个死信的消息会发送到死信交换机里,间接传递到死信队列里

@RabbitListener(queues = "common.queue")
public void listenCommonQueue(String msg) {
    log.info("消费者listenCommonQueue接收到消息:{}", msg);
    int i = 1 / 0;
    log.debug("消息处理完成!");
}

发送消息代码

@Test
public void testCommonQueue() {
    String queueName = "common.queue";
    String message = "hello, common.queue!";
    rabbitTemplate.convertAndSend(queueName, message);
}

最终消息到了死信队列里,防止了消息的丢失

消息的延迟消费

要求:发送消息后,消费者需要过10秒后才能消费到。

实现流程:消息发送到延迟队列(ttl.queue),过10秒后消息过期,将这个消息发送到死信交换机,转到死信队列,消费者监听死信队列,这样就可以在10s后获取到这个消息。

配置延迟队列,并绑定死信交换机

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue")
            .ttl(10000) // 设置队列的超时时间,10秒
            .deadLetterExchange("dl.direct")
            .deadLetterRoutingKey("dl_key")
            .build();
}
@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.exchange");
}
@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

发送消息到延迟队列,队列设置了延迟时间10s,10s后这条消息成为死信,传递到死信交换机

@Test
public void testTtlQueue() {
    String message = "这是一条延迟的消息";
    rabbitTemplate.convertAndSend("ttl.exchange","ttl",message);
}

消费端监听死信队列,10s后获取到过期消息。

@RabbitListener(queues = "dl.queue")
public void listenDlQueue(String msg){
    log.info("消费者从死信队列里获取到消息:{}",msg);
}

消息超时方法:除了给队列设置ttl属性也可以在消息处设置过期时间

如果两个都设置了,哪个时间短就使用哪个。

@Test
public void testTTLMsg() {
    Message message = MessageBuilder
            .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
            //设置这条消息的过期时间为5秒
            .setExpiration("5000")
            .build();
    rabbitTemplate.convertAndSend("ttl.exchange","ttl",message);
}

延迟队列

通过死信交换机和TTL实现消息的延时消费,配置起来相对麻烦,可以直接用延迟队列实现。

RabbitMQ的官方也推出了DelayExchange插件,实现延迟队列效果。

  1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
  2. 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列 (queue)并把消息给它
  3. 队列(queue)再把消息发送给监听它的消费者(customer)

下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-
3.8.4/plugins

启用插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看插件:rabbitmq-plugins list

重启Rabbitmq服务:systemctl restart rabbitmq-server

交换机指定属性 delayed = " true " ,代码实现如下:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue("delay_queue"),
        //交换机的类型可以是任意类型,只需要设定delayed属性为true即可。
        exchange = @Exchange(value = "delay_exchange",delayed = "true"),
        key = "delay"
))
public void listenDelayedQueue(String msg) {
    log.info("消费者listenDelayedQueue接收到消息:{}",msg);
}

发送消息,要携带x-delay属性,指定延迟的时间:

@Test
public void testDelayedMsg() {
    Message message = MessageBuilder
            .withBody("hello, delayed message".getBytes(StandardCharsets.UTF_8))
            .setHeader("x-delay",10000)//延迟10s
            .build();
    rabbitTemplate.convertAndSend("delay_exchange","delay",message);
}

消息的重复消费问题

消息的重复消费问题:指一条消息被消费者多次消费,在一些特殊的业务中是不能允许的。

重复消费场景示例如下:

解决消息重复消费问题:需要在消费端考虑消息的幂等性

幂等性:指对一个接口的多次调用其结果都是一样的。

解决方法:让生产者发送每条消息的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 redis 里查一下,之前消费过吗?(或是直接设计一张数据库表,表字段设为唯一约束存这个id,查到就表示消费过了),如果没有消费过,你就处理,然后这个 id 写 redis。如果消费过了,那就不处理了,保证别重复处理相同的消息即可。

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/dingmb/article/details/135217562
版权归原作者 dingmb 所有, 如有侵权,请联系我们删除。

“RabbitMQ高级特性”的评论:

还没有评论