0


如何确保消息的可靠性?RabbitMQ 在Springboot中的应用案例

案例介绍

使用mq发送邮件的优点在于:

  1. 能实现异步处理,提高系统的并发性和相应速度
  2. 更加灵活,只需要一个邮件系统就能和其他系统共用
  3. 能够确保消息可靠,提供了消息持久化消息确认机制等特性

这里我们以用户注册后需要同时发送邮件和短信这个场景做为示例,流程图如下所示。

以注册系统、邮件系统、短信系统为例

不介绍rabbitMQ的基础信息了,直接进入代码环节。

案例实操

生产者(注册系统)

pom文件中引入相关依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
                    <!-- 版本继承springboot -->
        </dependency>

application.yml文件配置

# rabbitmq
spring:
  rabbitmq:
    port: 5672
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    publisher-returns: true #开启生产者手动确认
    publisher-confirm-type: correlated #消息确认类型

做完这些就已经成功将rabbitMQ引入到Springboot中了,接下来是生产者中的配置类,这里使用的推送方式是topic

topic交换器是指按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的BindingKey进行模糊匹配,如果匹配成功,将消息分发到该Queue。 Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding
Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class LoginRabbitConfig {

    private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机";

    // 声明交换机 topic
    @Bean
    public TopicExchange topicExchange() {
        // 是否持久化、是否自动删除
        return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false);
    }

}

模拟一下注册的接口,因为注册后需要发送邮件和短信提醒用户,如果按照平时的顺序调用不仅耗时长,并且一旦邮件或短信发送失败没有进行异常处理话会导致注册失败,因此采用消息队列能够很好的解决这一问题,代码中UserPOJO只是定义的一个实体类

import com.alibaba.fastjson.JSON;
import gwc.mq.pojo.UserPOJO;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class LoginController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/register")
    public String register(UserPOJO userPOJO) {
        Object msg = JSON.toJSONString(userPOJO);
        // 设置ConfirmCallback
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                // 消息发送成功
                System.out.println("消息发送成功, correlationData: " + correlationData.getReturnedMessage());
            } else {
                // 消息发送失败
                System.out.println("消息发送失败, cause: " + cause);
            }
        });
        try {
            rabbitTemplate.convertAndSend("测试用Topic交换机", "*", msg, new CorrelationData(UUID.randomUUID().toString()));
        } catch (Exception e) {
            e.printStackTrace();
            // 失败处理 无论出现那种情况都将错误消息存到redis中 然后用定时任务统一发送
        }

        return "用户-" + userPOJO.getName() + "-注册成功!";
    }

}

在配置类中,我们打开了消息发送方的消息确认机制,因此在这里我们需要setConfirmCallback函数,其中correlationData是具体的消息,ack表示是否发送成功,cause则是失败的具体原因。

发送方发送失败的原因有三种可归为

(1)producter连接mq失败,消息没有发送到mq
(2)producter连接mq成功,但是发送到exchange失败
(3)消息发送到exchange成功,但是路由到queue失败

无论出现哪一种异常,我们都可以通过try catch来进行错误消息的处理,我采用的是捕获到错误后将消息存入db中(redis),再通过springboot的定时任务进行统一的重发,存入db代码就不再描述。

消费者(以邮件系统为例)

application.yml文件配置

#rabbitMQ
spring:
  rabbitmq:
    port: 5672
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #开启手动确认机制

邮件系统的配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MailRabbitConfig {
    private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机";

    private static final String QUEUE_NAME = "email_queue";

    private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机";

    private static final String ERR_QUEUE_NAME = "err_email_queue";

    // 声明交换机 topic
    @Bean
    public TopicExchange topicExchange() {
        // 是否持久化、是否自动删除
        return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false);
    }

    // 声明队列
    @Bean
    public Queue queue() {
        // 是否持久化、是否当前连接对象独占、是否自动删除
        return new Queue(QUEUE_NAME, true, false, false);
    }

    // 声明绑定关系
    @Bean
    public Binding queueBinding(Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("*.mail");
    }

    // 声明死信交换机 direct
    @Bean
    public DirectExchange directExchange() {
        // 是否持久化、是否自动删除
        return new DirectExchange(ERR_EXCHANGE_NAME_DIRECT, true, false);
    }

    // 声明死信队列
    @Bean
    public Queue errQueue() {
        // 是否持久化、是否当前连接对象独占、是否自动删除
        return new Queue(ERR_QUEUE_NAME, true, false, false);
    }

    // 声明绑定关系
    @Bean
    public Binding errQueueBinding(@Qualifier("errQueue")Queue errQueue, @Qualifier("directExchange")DirectExchange directExchange) {
        return BindingBuilder.bind(errQueue).to(directExchange).with("err.mail");
    }

}

邮件发送服务进行监听

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import gwc.mq.pojo.UserPOJO;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// 死信队列
// 消息出现问题后则会进入死信交换机,然后进入死信队列
// 建立一个消费者根据routingkey监听死信队列  即可处理不同的死信队列中的数据

@Component
public class MailListener {

    private static final int MAX_RETRY = 3;
    private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机";

    private static final String ERR_QUEUE_NAME = "err_email_queue";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange topicExchange;

    @Autowired
    private AmqpAdmin amqpAdmin;

    @RabbitListener(queues = "email_queue")
    public void sendMail(Message message, Channel channel) throws IOException {
        try {

            // 睡眠1秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            String s = new String(message.getBody());
            UserPOJO userPOJO = JSON.parseObject(s, UserPOJO.class);

            int a = 1 / 0; // 造成异常

            System.out.println("Mail系统收到 userPOJO=" + userPOJO);

        } catch (Exception e) {
            int retryCount = getRetryCount(message);
            System.out.println("Mail系统出现异常 当前retryCount =" + retryCount);
            if (retryCount < MAX_RETRY) {
                // 重试次数未达到最大次数,将消息重新发送到主队列,并增加重试次数
                MessageProperties properties = message.getMessageProperties();
                properties.setHeader("retry_count", retryCount + 1);
                rabbitTemplate.send(topicExchange.getName(), "info.mail", message);
            } else {

                // 设置队列属性
                Map<String, Object> arguments = new HashMap<String, Object>();
                // 设置队列的TTL
                arguments.put("x-message-ttl", 10000);
                arguments.put("x-dead-letter-exchange", ERR_EXCHANGE_NAME_DIRECT);// 设置死信队列的交换器名称
                arguments.put("x-dead-letter-routing-key", "err.mail");// 设置死信队列的路由键

                // 发送给TTL队列
                amqpAdmin.declareQueue(new Queue("TTL_email_queue", true, false, false, arguments));

                amqpAdmin.declareBinding(new Binding("TTL_email_queue", Binding.DestinationType.QUEUE, "死信交换机", "ttl_email", null));

                // 发送消息到TTL队列 此队列无消费者 在消息过期后会自动转发到配置的死信队列中去
                rabbitTemplate.send(ERR_EXCHANGE_NAME_DIRECT, "ttl_email", message);
            }
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            return;
        }

        // 需要消息确认ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 这里false表示是否开启批量应答
        /*
            何为批量应答?
            比如说channel上有传送tag的消息5,6,7,8,当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答 即批量应答
        */
    }

    // 死信消费
    @RabbitListener(queues = ERR_QUEUE_NAME)
    public void doFailedInformation(Message message, Channel channel) throws IOException {

        // 再次消费信息
        try {
            String s = new String(message.getBody());
            UserPOJO userPOJO = JSON.parseObject(s, UserPOJO.class);
            System.out.println("死信消费者消费死信队列: " + userPOJO);
            // 消费消息

        } catch (Exception e) {
            e.printStackTrace();
            // 表中字段 success修改为0
            // 发送一封邮件给操作人
        }

        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    }

    // 获取重试次数
    private int getRetryCount(Message message) {
        Object retryCountObj = message.getMessageProperties().getHeaders().get("retry_count");
        if (retryCountObj instanceof Integer) {
            return (Integer) retryCountObj;
        }
        return 0;
    }

}

两个方法分别监听业务队列和死信队列,如果消息消费出现异常,则重新将消息放入队列尾部,如果重试次数达到三次则将此消息放入TTL队列中,TTL队列中的消息会根据配置的过期时间、死信交换机、以及死信交换机上的routingkey对消息进行投送,进入相应的死信队列,然后再通过死信消费者进行消费处理,此时若再次发送失败,则发送邮件提醒人员进行手工发送来确保消息的有效性。

消费者确保消息的可靠性通过下示代码进行消息的确认

/*
   (1)channel.basicAck 用于肯定确认,RabbitMQ已经知道该消息并且成功地处理消息,可以将其丢弃了
   (2)channel.basicNack 用于否定确认
   (3)channel.basicReject 用于否定确认,与channel.basicNack相比少一个参数,不处理该消息了直接        
        拒绝,可以将其丢弃了
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

总结

消息发送的流程大致可化为 生产者(业务接口) -> mq -> 交换机 -> 队列 -> 消费者

生产者消息确认机制可以确保在前半部分的有效性,消费者手动确认机制可以确保在后半部分的有效性,而一旦消息连续失败多次,我们还有保底方案通过定时任务扫描DB获取失败的消息转而通过人工发送,这样就可以在全流程上确保消息的可靠性了,这里仅仅是我个人的一套保证可靠性的方案,如果有其他更为可行的方案欢迎评论区补充


本文转载自: https://blog.csdn.net/gwc_2000/article/details/134665771
版权归原作者 dejavu_0716 所有, 如有侵权,请联系我们删除。

“如何确保消息的可靠性?RabbitMQ 在Springboot中的应用案例”的评论:

还没有评论