0


RabbitMQ基本原理

一、基本结构

在这里插入图片描述
所有中间件技术都是基于 TCP/IP 协议基础之上进行构建新的协议规范,RabbitMQ遵循的是AMQP协议(Advanced Message Queuing Protocol - 高级消息队列协议)。

生产者发送消息流程:
1、生产者和Broker建立TCP连接;
2、生产者和Broker建立通道;
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发;
4、Exchange将消息转发到指定的Queue(队列)。

【详细】

1、消息生产者连接到RabbitMQ Broker,建立链接(Connection),在链接(Connection)上开启一个信道(Channel);
2、声明一个交换机(Exchange),并设置相关属性,比如交换机类型、是否持久化等;
3、声明一个队列(Queue),并设置相关属性,比如是否排他、是否持久化、是否自动删除等;
4、使用路由键(RoutingKey)将队列(Queue)和交换机(Exchange)绑定起来;
5、生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息,根据路由键(RoutingKey)发送消息到交换机(Exchange);
6、相应的交换器(Exchange)根据接收到的路由键(RoutingKey)查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中;
7、如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者;
8、关闭信道(Channel);
9、关闭链接(Connection);

消费者接收消息流程:
1、消费者和Broker建立TCP连接;
2、消费者和Broker建立通道;
3、消费者监听指定的Queue(队列);
4、当有消息到达Queue时Broker默认将消息推送给消费者;
5、消费者接收到消息;
6、ack回复。

【详细】

1、建立链接(Connection);
2、在链接(Connection)上开启一个信道(Channel);
3、请求消费指定队列(Queue)的消息,并设置回调函数(onMessage);
4、[MQ]将消息推送给消费者,消费者接收消息;
5、消费者发送消息确定(Ack[acknowledge]);
6、[MQ]删除被确认的消息;
7、关闭信道(Channel);
8、关闭链接(Connection);

MQ消费消息分发原理

1)一种是Pull模式,对应的方法是basicGet。
消息存放在服务端,只有消费者主动获取才能拿到消息。如果每搁一段时间获取一次消息,消息的实时性会降低。
但是好处是可以根据自己的消费能力决定消息的频率。

2)另一种是push,对应的方法是BasicConsume,只要生产者发消息到服务器,就马上推送给消费者,
消息保存客户端,实时性很高,如果消费不过来有可能会造成消息积压。Spring AMQP是push方式,
通过事件机制对队列进行监听,只要有消息到达队列,就会触发消费消息的方法。

二、RabbitMQ组成部分说明

  • Producer: 消息生产者,即生产方客户端,生产方客户端将消息发送;
  • Connection:TCP连接,生产者或消费者与消息队列RabbitMQ版间的物理TCP连接;

1)Connection会执行认证、IP解析、路由等底层网络任务。
2)应用与消息队列RabbitMQ版完成Connection建立大约需要15个TCP报文交互,因而会消耗大量的网络资源和消息队列RabbitMQ版资源。
3)一个进程对应一个Connection,一个进程中的多个线程则分别对应一个Connection中的多个Channel。
4)Producer和Consumer分别使用不同的Connection进行消息发送和消费;

  • Channel:在客户端的每个物理TCP连接里,可建立多个Channel,每个Channel代表一个会话任务。

1)Channel是物理TCP连接中的虚拟连接。
2)当应用通过Connection与消息队列RabbitMQ版建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。
3) Channel可以复用Connection,即一个Connection下可以建立多个Channel。
4) Channel不能脱离Connection独立存在,而必须存活在Connection中。
5) 当某个Connection断开时,该Connection下的所有Channel都会断开。

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue;
  • Exchange(交换器):生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。Exchange根据消息的属性或内容路由消息。在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费者进行消费。Exchange 有四种类型:1)Direct Exchange : 【路由规则】 Direct Exchange根据Binding Key和Routing Key完全匹配的规则路由消息。 【使用场景】 Direct Exchange适用于通过简单字符标识符区分消息的场景。 Direct Exchange常用于单播路由。 2)Topic Exchange 【路由规则】 Topic Exchange根据Binding Key和Routing Key通配符匹配的规则路由消息。 Topic Exchange支持的通配符包括星号(*)和井号(#)。 星号(*)代表一个英文单词(例如cn)。 井号(#)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如cn.zj.hz。 【使用场景】 Topic Exchange适用于通过通配符区分消息的场景。 Topic Exchange常用于多播路由。例如,使用Topic Exchange分发有关于特定地理位置的数据。3)Fanout Exchange 【路由规则】 Fanout Exchange忽略Routing Key和Binding Key的匹配规则,将消息路由到所有绑定的Queue。【使用场景】 Fanout Exchange适用于广播消息的场景。例如,分发系统使用Fanout Exchange来广播各种状态和配置更新。 4)Headers Exchange 【路由规则】 Headers Exchange可以被视为Direct Exchange的另一种表现形式。 Headers Exchange可以像Direct Exchange一样工作,不同之处在于Headers Exchange使用Headers属性代替Routing Key进行路由匹配。在绑定Headers Exchange和Queue时,可以设置绑定属性的键值对。然后,在向Headers Exchange发送消息时,设置消息的Headers属性键值对。Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值: 1)all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。2)any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。 以下两种情况下,认为消息Headers属性键值对和绑定属性键值对匹配: 1、 消息Headers属性的键和值与绑定属性的键和值完全相同; 2、 消息Headers属性的键和绑定属性的键完全相同,但绑定属性的值为空。【使用场景】 Headers Exchange适用于通过多组Headers属性区分消息的场景。Headers Exchange常用于多播路由。例如,涉及到分类或者标签的新闻更新。
  • Queue:消息队列,存储消息的队列,每个消息都会被投入到一个或多个Queue里;
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息;
  • Routing Key(路由键):生产者在向Exchange发送消息时,需要指定一个Routing Key来设定该消息的路由规则。 Routing Key需要与Exchange类型及Binding Key联合使用才能生效。一般情况下,生产者在向Exchange发送消息时,可以通过指定Routing Key来决定消息被路由到哪个或哪些Queue;
  • Binding:一套绑定规则,用于告诉Exchange消息应该被存储到哪个Queue。它的作用是把Exchange和Queue按照路由规则绑定起来。
  • Binding Key(绑定键):用于告知Exchange应该将消息投递到哪些Queue中(生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定的);

三、代码示例

生产者产生的消息并不会直接发送到队列上面,生产者只能将消息发送到交换机(Exchange)上面,然后由交换机来处理这些消息,是放到特定队列还是丢弃这些消息,这就由交换机来决定。

1. work模型

简单工作模型相当于一个生产者生产一条消息就只能推到一个队列里面然后被消费,也就是direct。

1)队列绑定
(说明:不管是生产者还是消费者,应当尽量避免手动创建队列,应使用程序自动创建队列(使用@QueueBinding注解),并绑定与之对应的交换器。 这里手动创建并绑定队列是为了明了看出Exchange、 Routing Key、 Queue 这三者之间的绑定关系)

 @Configuration
    public class RabbitMQConfig {
        public static final String ROUTING_QUEUE01 = "routing.queue01";
        public static final String ROUTING_QUEUE02 = "routing.queue02";
        public static final String ROUTING_EXCHANGE = "routing.exchange";

        /**
         * direct模式
         * direct与Fanout模式不同点在于direct不能模糊匹配规则,只能按准确的规则路由。
         * @return
         */

        @Bean
        public Queue routingQueue01(){return new Queue(ROUTING_QUEUE01,true);}
        @Bean
        public Queue routingQueue02(){return new Queue(ROUTING_QUEUE02,true);}
        @Bean
        public DirectExchange routingExchange(){return new DirectExchange(ROUTING_EXCHANGE);}
        @Bean
        public Binding routingBinding1(){
            return BindingBuilder.bind(routingQueue01()).to(routingExchange()).with("queue01");
        }
        @Bean
        public Binding routingBinding2(){
            return BindingBuilder.bind(routingQueue02()).to(routingExchange()).with("queue02");
        }

    }

2)生产者消费者

@RestController
public class TestMqController {
    @Resource
    ConfirmPublish confirmPublish;

    @Resource
    TestReturnCallBack testReturnCallBack;

    @Resource
    private RabbitTemplate rabbitTemplate;  
    
    /**
     * 生产者
     * @param code
     * @return
     */
    @GetMapping("/mq/send/{code}")
    public Result<String> sendWork(@PathVariable("code") String code) {
        SysUser datapermBO = new SysUser();
        datapermBO.setName("你好");
        datapermBO.setPassword(code);

        //Confirm确认
        rabbitTemplate.setConfirmCallback(confirmPublish);

        //Return确认
        rabbitTemplate.setReturnCallback(testReturnCallBack);

        CorrelationData correlationData =  new CorrelationData(UUID.randomUUID().toString());
        //routing.exchange 和 queue01 绑定一个唯一的队列 routing.queue01
        rabbitTemplate.convertAndSend("routing.exchange","queue01", datapermBO,correlationData);
        System.out.println("给mq发送消息:" + code);
        return Result.succeed("发送成功...");
    }

    /**
     *  消费者(监听器模式)
     * @param msg
     */
    @RabbitListener(queues = "routing.queue01")
    public void receiveMessage(Message message, Channel channel, SysUser msg) throws IOException {
        // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 重启应用后还会在发
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        // 只包含发送的消息
        System.out.println("1接收到消息:" + JSON.toJSONString(msg));
    }
    
  }

2. 发布订阅模型

发布订阅模型:简单工作模型相当于一个生产者生产一条消息就只能推到一个队列里面然后被消费,发布订阅模型相当于生产者生产一条消息,通过交换机就可以推到多个队列里面,然后被消费,相当于广播的模式。

Fanout模式就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都接收到这个消息。

1)声明队列

 @SpringBootConfiguration
    public class RabbitMQConfig2 {
        /**
         *  声明两个队列
         * @return
         */
        @Bean
        public Queue queue_one() {
            return new Queue("queue_one");
        }
        @Bean
        public Queue queue_two() {
            return new Queue("queue_two");
        }
        /**
         * 准备一个交换机
         * @return
         */
        @Bean
        public FanoutExchange exchangeFanout() {
            return new FanoutExchange("exchange_fanout");
        }
        /**
         * 将交换机和队列进行绑定
         * @return
         */
        @Bean
        public Binding bindingExchange1() {
            return BindingBuilder.bind(queue_one()).to(exchangeFanout());
        }
        @Bean
        public Binding bindingExchange2() {
            return BindingBuilder.bind(queue_two()).to(exchangeFanout());
        }
    }

2)生产者消费者

    @GetMapping("/mq/producer/{code}")
    public Result<String> producer(@PathVariable("code") String code) {
        SysUser datapermBO = new SysUser();
        datapermBO.setName("生产者/消费者模式");
        datapermBO.setPassword(code);
        /**
         * 参数
         * 1.交换机 exchange_fanout
         * 2.路由key
         * 3.传输对象
         */
        rabbitTemplate.convertAndSend("exchange_fanout","", datapermBO);
        return Result.succeed("发送成功...");
    }

    @RabbitListener(queues = "queue_one")
    public void receiveMessage_one(Message message, Channel channel,SysUser msg) throws IOException {
        // 只包含发送的消息
        System.out.println("1接收到消息:" + JSON.toJSONString(msg));

        //Return确认:从交换机到队列也有可能出现路由失败导致消息丢失情景,Return机制可解决这个问题,路由失败时可以通过Return回调来将路由失败的消息记录下来
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

    @RabbitListener(queues = "queue_two")
    public void receiveMessage_two(SysUser msg) {
        // 只包含发送的消息
        System.out.println("2接收到消息:" + JSON.toJSONString(msg));
    }

3.Topic模型

通过通配符匹配的规则路由消息。

1)绑定队列

@SpringBootConfiguration
    public class RabbitMQConfig3 {
        /**
         *  声明两个队列
         * @return
         */
        @Bean
        public Queue queueTopic1() {
            return new Queue("queue_topic1");
        }
        @Bean
        public Queue queueTopic2() {
            return new Queue("queue_topic2");
        }

        /**
         * 创建topic类型交换器
         * @return
         */
        @Bean
        public TopicExchange exchangeTopic() {
            return new TopicExchange("exchange_topic");
        }

        /**
         *  #(井号)可以替代零个或多个单词   匹配0个或者多个标识符
         *  topic.# 能匹配topic.test.one 和 topic.test
         * @return
         */
        @Bean
        public Binding bindingTopic1() {
            return BindingBuilder.bind(queueTopic1()).to(exchangeTopic()).with("topic.#");
        }

        /**
         *  *(星号)可以代替一个单词  匹配一个标识符
         * @return 仅匹配topic.test
         */
        @Bean
        public Binding bindingTopic2() {
            return BindingBuilder.bind(queueTopic2()).to(exchangeTopic()).with("topic.*");
        }

    }

2)生产者和消费者

  @GetMapping("/mq/topic/{code}")
    public Result<String> topic(@PathVariable("code") String code) {
        SysUser datapermBO = new SysUser();
        datapermBO.setName("topic模型");
        datapermBO.setUsername(code);
        datapermBO.setPassword("topic.test.one");
        /**
         * 参数
         * 1.交换机
         * 2.路由key
         * 3.传输对象
         */
        rabbitTemplate.convertAndSend("exchange_topic","topic.test.one", datapermBO);
        datapermBO.setProjectCode("topic.test");
        rabbitTemplate.convertAndSend("exchange_topic","topic.test", datapermBO);
        return Result.succeed("发送成功...");
    }

    @RabbitListener(queues = "queue_topic1")
    public void receiveMessage_one1(SysUser msg) {
        // 代表  topic.#
        System.out.println("1接收到消息:" + JSON.toJSONString(msg));
    }

    @RabbitListener(queues = "queue_topic2")
    public void receiveMessage_two1(SysUser msg) {
        // 代表  topic.*
        System.out.println("2接收到消息:" + JSON.toJSONString(msg));
    }

四、生产者确认机制

1、确认原理

生产者将消息发送到exchange,exchange根据路由规则将消息投递到了queue。

1)Confirm确认

  1. 生产者发送消息到交换机时会存在消息丢失的情景,开启事务会导致吞吐量下降,Confirm机制就是消息发送到交换机(Exchange)时会触发Confirm回调。
  2. 通过 publisher confirm (发送方确认机制)可以确定消息是否被成功路由到MQ broker从而选择是否重发等步骤。
  3. 当生产者开启 publisher confirm 消息发送到MQ端之后,MQ会回一个ack给生产者,ack是个boolean值,为true消息成功发送到MQ。反之发送失败。

2)Return确认:从交换机到队列也有可能出现路由失败导致消息丢失情景(可能是MQ出问题导致queue和exchange绑定丢失,或者失误删除了绑定关系等),Return机制可解决这个问题,路由失败时可以通过Return回调来将路由失败的消息记录下来。

2、代码示例

生产者

 @GetMapping("/mq/send/{code}")
    public Result<String> sendWork(@PathVariable("code") String code) {
        SysUser datapermBO = new SysUser();
        datapermBO.setName("你好");
        datapermBO.setPassword(code);

        //Confirm确认
        rabbitTemplate.setConfirmCallback(confirmPublish);

        //Return确认
        rabbitTemplate.setReturnCallback(testReturnCallBack);

        CorrelationData correlationData =  new CorrelationData(UUID.randomUUID().toString());
        //routing.exchange 和 queue01 绑定一个唯一的队列 routing.queue01
        rabbitTemplate.convertAndSend("routing.exchange","queue01", datapermBO,correlationData);
        System.out.println("给mq发送消息:" + code);
        return Result.succeed("发送成功...");
    }

Confirm确认

@Component
class ConfirmPublish implements RabbitTemplate.ConfirmCallback{

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b){
            System.out.println("---生产者发送消息到交换机时  message publish success---");
        }else{
            System.out.println("---生产者发送消息到交换机时  message publish failed---");
            System.out.println("reason: "+s);
        }
    }
}

return确认

@Component
 class TestReturnCallBack implements RabbitTemplate.ReturnCallback{

    /**
     * 消息补偿
     * 处理当前的exchange不存在或者指定路由的key不存在的情况
     * */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("收到return回调:投递消息到队列失败");
    }
}

五、消费者确认机制

1、消费者确认原理

消费者确认是指当一条消息投递到消费者处理后,消费者发送给MQ broker的确认
通俗的说就是 告知服务器这条消息已经被我消费了,可以在队列删掉 ,这样以后就不会再发了, 否则消息服务器以为这条消息没处理掉 重启应用后还会在发)。

有auto和manual两种
1)auto则由broker自行选择时机,一般可认为消息发送到消费者后就直接被ack,也即消息会被从队列中移除掉而不顾消息的处理逻辑是否成功;

2)manual则是需要消费者显式的去手动ack后消息才会被从队列中移除掉,通过这个机制可以限制在消息处理完之后再Ack或者nack; 开启手动确认模式,即由消费方自行决定何时应该ack,通过设置autoAck=false开启手动确认模式;

2、代码示例

  //RabbitMq懒加载, 配置了监听的队列,服务启动时会自动创建绑定的队列
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue("queue-test"),
                    exchange = @Exchange(value = "exchange.test"),
                    key = "routingkey.test"
            ), ackMode = "MANUAL"
    )
    @RabbitHandler
    public void receive(Message message, Channel channel, AllTest msg) throws IOException {
        //ack确认 消费者获取消息后告知RabbitMq此消息已接收,可从队列删除此消息
        // 告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 重启应用后还会在发
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        // 只包含发送的消息
        System.out.println("1接收到消息:" + JSON.toJSONString(msg));
    }

六、消息持久化

消息发送并保存到队列之后如果不做特殊处理是保存在内存中,当节点宕机重启或者内存故障等,会导致消息丢失,通过对消息进行持久化到磁盘可以降低这种风险, 除了对消息进行持久化还是不够,还需要对queue、exchange进行持久化。


本文转载自: https://blog.csdn.net/Little_Arya/article/details/129390890
版权归原作者 Little-Arya 所有, 如有侵权,请联系我们删除。

“RabbitMQ基本原理”的评论:

还没有评论