0


RabbitMQ-交换机

文章目录


一、交换机是什么?

英文名称Exchange,生产者发送消息时,先将消息投递到交换机中,再由交换机转发到具体的队列,队列再将消息以推送或者拉取方式给消费之进行消费,相当于一个中间商或者说是媒介
在这里插入图片描述

1.路由键

生产者将消息发给交换机的时候会指定RoutingKey,也就是说生产者将消息发给哪个交换机是由RoutingKey决定的。

2.绑定键

项正确的将消息路由到队列,需通过绑定键将交换机与队列关联起来。

二、交换机的类型

1.直连交换机:Direct exchange

规则

将消息推送到binding key与该消息的routing key相同的队列。

理解

据下图所示,如果生产者发了一条black到交换机处,交换机会根据black去找与它同名的线路,最终将消息路由到了Q2。
在这里插入图片描述
备注:同一个绑定键可以绑定到不同的队列上。
缺点:如果希望一条消息发送给多个队列,交换机就需要绑定多个routingkey,管理会比较困难。

2.主题交换机:Topic exchange

规则

routing key必须时由点号分开的一串单词,例如“a.a.a”,“a.b.c”等,不能是中文,最大限制是255bytes。

符号的含义

*标识任意一个单词
#标识任意一个或多个单词

小测试:根据图片判断进Q1还是Q2

在这里插入图片描述

3.扇形交换机:Fanout exchange

广播消息,意思就是扇形交换机把接收到的消息全部发送给绑定在自己身上的队列,速度是交换机类型里面最快的。

4.首部交换机:Headers exchange

不需要路由键,交换时通过Headers头部来将消息映射到队列中。

5.默认交换机

每一个新建的队列都会自动绑定到默认交换机上,绑定的路由键的名称与队列名相同。
类似amq.*的名称的交换机,是RabbitMQ默认创建的,不要动它们!!!(如下图)
在这里插入图片描述

6.死信交换机(延迟队列):Dead Letter Exchange

应用场景:订单超时处理
消息变成死信的三种情况:

  • 消息被拒绝,平且设置requeue参数为false;
  • 消息过期(默认情况下Rabbit中的消息不过期,但可以手动设置过期时间);
  • 队列达到最大长度;

在这里插入图片描述

消息消费者如何通知 Rabbit 消息消费成功?

  • 消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK
  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
  • 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失
  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者
  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟
  • 消息确认模式有: - AcknowledgeMode.NONE:自动确认- AcknowledgeMode.AUTO:根据情况确认- AcknowledgeMode.MANUAL:手动确认

如果要将确认消息的方式改为手动确认,则需要将确认模式修改为 manual

在application.yml中加入以下代码:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

如图所示:加载消费者的application.yml中
在这里插入图片描述

三、实战代码

接收者1:

@Component
 @SuppressWarnings("all")
 @Slf4j
 @RabbitListener(queues ="queue1")
 public class ReceiverQ1 {
    @RabbitHandler
    public void process(String msg){
        log.warn("Q1接收到:" + msg);}}

接收者2:

@Component
 @SuppressWarnings("all")
 @Slf4j
 @RabbitListener(queues ="queue1")
 public class ReceiverQ2 {
    @RabbitHandler
    public void process(String msg){
        log.warn("Q2接收到:" + msg);}}

1.直连交换机

//------------直连交换机--------------------
    @Bean
    public Queue queue1(){return new Queue("queue1");}

    @Bean
    public Queue queue2(){return new Queue("queue2");}

    //交换机
    public DirectExchange directExchange(){return new DirectExchange("directExchange");}

    //绑定关系
    public Binding binding1(){return BindingBuilder
                .bind(queue1()) //绑定一个队列
                .to(directExchange())  //绑定到哪个交换机
                .with("aa");  //规则
    }

    //绑定关系
    public Binding binding2(){return BindingBuilder
                .bind(queue1()) //绑定一个队列
                .to(directExchange())  //绑定到哪个交换机
                .with("bb");  //规则
    }

controller

@RequestMapping("/send3")
    public String send3(){
        //向消息队列发送消息
        template.convertAndSend("directExchange","bb","hello");return"😀";}

2.主题交换机

//--------------------主题交换机------------------------
    //规则如下:
    // *.*.aa  ->Q1
    // *.*.bb  ->Q2
    // mq.# ->Q1,Q2
    @Bean
    public TopicExchange topicExchange(){return new TopicExchange("topicExchange");}

    @Bean
    public Binding binding3(){return BindingBuilder
                .bind(queue1())
                .to(topicExchange())
                .with("*.*.aa");}

    @Bean
    public Binding binding4(){return BindingBuilder
                .bind(queue2())
                .to(topicExchange())
                .with("*.*.bb");}

    @Bean
    public Binding binding5(){return BindingBuilder
                .bind(queue1())
                .to(topicExchange())
                .with("mq.#");}

    @Bean
    public Binding binding6(){return BindingBuilder
                .bind(queue2())
                .to(topicExchange())
                .with("mq.#");}

controller

@RequestMapping("/send4")
    public String send4(String rex){
        //向消息队列发送消息
        template.convertAndSend("topicExchange",rex,"hello");return"😀";}

3.扇形交换机

//-----------------扇形交换机--------------------
    //群发,不需要Bingding key
    @Bean
    public FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}

    @Bean
    public Binding binding7(){return BindingBuilder
                .bind(queue1())
                .to(fanoutExchange());}

    @Bean
    public Binding binding8(){return BindingBuilder
                .bind(queue2())
                .to(fanoutExchange());}

controller

 @RequestMapping("/send5")
    public String send4(){
        //向消息队列发送消息
        template.convertAndSend("fanoutExchange","","hello");return"😀";}

4.死信交换机

//-----------------------------死信,延迟交换机------------------------------
    @Bean
    public Queue queueA(){//正常队列
        Map<String, Object> config = new HashMap<>();
        config.put("x-message-ttl", 20000);//message在该队列queue的存活时间最大为10秒
        config.put("x-dead-letter-exchange", "BExchange"); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-routing-key", "bb");//x-dead-letter-routing-key参数是给这个DLX指定路由键
        return new Queue("queueA",true,false,false,config);}

    @Bean
    public DirectExchange AExchange(){return new DirectExchange("AExchange");}

    @Bean
    public Binding bindingA(){return BindingBuilder
                .bind(queueA())
                .to(AExchange())
                .with("aa");}

    @Bean
    public Queue queueB(){//死信
        return new Queue("queueB");}

    @Bean
    public DirectExchange BExchange(){return new DirectExchange("BExchange");}

    @Bean
    public Binding bindingB(){return BindingBuilder
                .bind(queueB())
                .to(BExchange())
                .with("bb");}

ReceiverQA

@Component
 @SuppressWarnings("all")
 @Slf4j
 @RabbitListener(queues ="queueA")
 public class ReceiverQA {
    @RabbitHandler
    public void process(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.error("QA接收到:" + msg);
        //channel.basicAck(tag,true);确认
        //手动拒绝
        channel.basicReject(tag,false);
        //延迟一秒钟
        Thread.sleep(1000);}}

ReceiverQB

@Component
 @SuppressWarnings("all")
 @Slf4j
 @RabbitListener(queues ="queueB")
 public class ReceiverQB {
    @RabbitHandler
    public void process(String id){
        log.warn("QB接收到:" + id);
        //拿到id去数据库做修改
    }}

controller

@RequestMapping("/send6")
    public String send6(){
        //向消息队列发送消息
        template.convertAndSend("AExchange","aa","12233");return"😀";}

四、总结

RabbitMQ 中的交换机(Exchange)是消息路由的关键组件,它负责将消息路由到相应的队列。不同类型的交换机在消息路由时具有不同的规则。以下是 RabbitMQ 交换机的一些常见应用场景:

1.直连交换机(Direct Exchange):

2.场景: 当需要将消息直接路由到与消息的路由键完全匹配的队列时。
3.应用: 点对点通信,需要特定消息传递给特定队列的情况。

4.扇出交换机(Fanout Exchange):

5.场景: 当需要将消息广播到所有绑定到交换机的队列时。
6.应用: 发布/订阅模式,消息需要发送给所有订阅者的情况。

7.主题交换机(Topic Exchange):

8.场景: 当需要根据通配符规则将消息路由到匹配的队列时。
9.应用: 多对多通信,消息根据一定规则发送到匹配的队列。

10.头交换机(Headers Exchange):

11.场景: 当需要根据消息的头部属性来进行匹配路由时。
12.应用: 根据消息头的属性进行复杂的匹配规则。

13.默认交换机(Default Exchange):

14.场景: 当没有指定交换机时,消息会被默认发送到默认交换机。
15.应用: 简化配置,适用于点对点通信的简单场景。

这些场景仅是 RabbitMQ 交换机应用的一部分,实际应用中可以根据具体需求选择合适的交换机类型。通过灵活配置交换机和队列的绑定关系,可以实现多样化的消息路由和分发机制,满足不同业务场景的需求。

五、死信交换机的总结

死信交换机(Dead Letter Exchange)是消息中间件中的一种特殊交换机,用于处理无法被正常消费的消息。以下是死信交换机的一些常见应用场景:

1.消息重试机制: 当消息在队列中由于某些原因无法被正常消费时,死信交换机可以将这些消息重新发送到指定的队列,以便进行重试。这对于处理临时的、可恢复的错误非常有用。
2.超时处理: 当消息在队列中等待的时间超过预定的超时时间,可以被认为是死信。死信交换机可以将这些超时的消息重新定向到指定的队列,以便进一步处理。
3.业务异常处理: 在某些情况下,业务层面的异常可能导致消息无法正常处理。通过死信交换机,可以将出现业务异常的消息转移到专门的队列中,进行详细的分析和处理。
4.队列满溢处理: 当队列达到一定的容量限制时,新的消息可能会被认为是死信,因为队列已经无法再容纳更多的消息。死信交换机可以将这些消息定向到其他队列或者进行适当的报警。
5.消息过期处理: 当消息设置了过期时间,但在此时间内没有被消费时,可以将其认定为死信,并通过死信交换机进行处理,以防止过期消息一直占用队列资源。
6.处理未知路由: 当消息被发送到一个不存在的队列时,可以将这些消息定向到死信交换机,以便进行处理,而不是让这些消息消失或者被忽略。
7.处理消费者异常退出: 如果消费者异常退出或崩溃,可能会导致正在处理的消息无法正常确认,这些消息可以被发送到死信交换机,以便重新处理。

总体而言,死信交换机为消息中间件系统提供了一种有效的机制来处理无法正常消费的消息,帮助系统在出现异常情况时进行优雅的处理和恢复。这对于确保消息可靠性和系统稳定性非常重要。


本文转载自: https://blog.csdn.net/2301_78435981/article/details/135746241
版权归原作者 幸运儿~ 所有, 如有侵权,请联系我们删除。

“RabbitMQ-交换机”的评论:

还没有评论