0


中间件_RabbitMQ五种消息模型

文章目录

RabbitMQ官方文档

RabbitMQ 提供了5种常用消息模型。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

在这里插入图片描述
在这里插入图片描述

1.简单消息队列模型

简单消息队列官方文档

1、创建简单消息队列

在这里插入图片描述

2、导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

3、编写生产者测试类SpringAmqpTest,并利用 RabbitTemplate 实现消息发送

@SpringBootTestpublicclassSpringAMQPTest{//SpringAMQP提供的三个功能之一:封装了 RabbitTemplate 工具,用于发送消息 (生产者)@AutowiredprivateRabbitTemplate template;@Testvoidtest(){//利用RabbitTemplate提供的方法向指定队列发送消息
        template.convertAndSend("simple.queue","Hello SpringAQMP!");}}

4、编写消费者,监听队列消息

@ComponentpublicclassMessageListener{/**
     * SpringAMQP提供的三个功能之二:基于注解的监听器模式,异步接收消息 (消费者)
     * @RabbitListener是SpringAMQP提供的监听消息队列的注解
     * @param message
     */@RabbitListener(queues ="simple.queue")publicvoidbasicQueueListener(String message){System.out.println("消费者接收到消息:"+ message);}}

在这里插入图片描述

5、编写生产者和消费者的application.yml文件

spring:rabbitmq:host: 192.168.6.131
    port:5672# 虚拟主机virtual-host: /
    username: rabbitmq
    password:123456

在这里插入图片描述

6、测试

运行生产者代码后,simple.queue 队列中产生了一条待消费的消息。
在这里插入图片描述
运行消费者代码后,队列中的消息被消费
在这里插入图片描述
在这里插入图片描述

2.Work工作队列模型

work工作队列官方文档

1、 创建 work.queue 队列

在这里插入图片描述

2、创建生产者测试类

@TestvoidtestWorkQueue(){String queueName ="work.queue";String message ="Hello SpringAQMP-";for(int i =0; i <20; i++){
            template.convertAndSend(queueName,message+i);}}

3、创建消费者测试类

@RabbitListener(queues ="work.queue")publicvoidworkQueueListener01(String message)throwsInterruptedException{System.out.println("消费者01接收到消息:"+ message +" - "+LocalTime.now());Thread.sleep(20);}@SneakyThrows@RabbitListener(queues ="work.queue")publicvoidworkQueueListener02(String message){System.out.println("消费者02接收到消息:"+ message +" - "+LocalTime.now());

4、修改消费者的application.yml

正常情况下每个消费者要消费的消息数量是一样的。消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然不合理,因此修改消费者的配置文件使得每个消费者按照能力顺序处理信息

spring:rabbitmq:listener:simple:# 简单队列和work队列的配置prefetch:1# 每次只能获取一条消息,处理完成才获取下一条消息

5、测试

运行生产者和消费者类,两个消费者收到的同一个队列中的消息肯定是不一样的。
在这里插入图片描述

3.发布订阅模型

在发布订阅模型中,多了一个exchange角色,而且过程略有变化:

  • producer:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于exchange的类型。exchange 有以下3种类型:- fanout:广播,将消息交给所有绑定到交换机的队列- direct:定向,把消息交给符合指定 routing key 的队列- topic:通配符,把消息交给符合 routing pattern 的队列
  • consumer:消费者,订阅队列
  • queue:消息队列,接收消息、缓存消息。

exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 exchange 绑定,或者没有符合路由规则的队列,那么消息会被丢弃!
在这里插入图片描述

3.1.Fanout广播

Fanout广播官方文档
在这里插入图片描述

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
  • 使用内置的 amq.fanout 交换机在这里插入图片描述
  • 两个队列不用手动创建,是在下面

1、创建生产者测试类

@TestvoidtestFanoutExchange(){
    rabbitTemplate.convertAndSend("amq.fanout","","Hello amq.fanout");}

2、创建消费者测试类

@ComponentpublicclassSpringRabbitListener{@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="fanout.queue01"),
            exchange =@Exchange(name ="amq.fanout", type =ExchangeTypes.FANOUT)))publicvoidfanoutQueueListener01(String message){System.out.println("消费者01接收到fanout.queue01的消息:"+ message);}@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="fanout.queue02"),//使用@Queue(name = "fanout.queue01")RabbitMQ自动生成队列
            exchange =@Exchange(name ="amq.fanout", type =ExchangeTypes.FANOUT)))publicvoidfanoutQueueListener02(String message){System.out.println("消费者02接收到fanout.queue02的消息:"+ message);}}

3、测试

运行生产者和消费者
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

3.2.Direct路由

Direct路由官方文档
在这里插入图片描述
在 Fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到 direct 类型的 exchange。

Direct模型下:

  • 队列与交换机的绑定要指定一个 RoutingKey
  • 消息的发送方在向 exchange 发送消息时,也必须指定消息的 RoutingKey。
  • exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息

1、创建生产者测试类

@TestvoidtestDirectExchange(){
    rabbitTemplate.convertAndSend("amq.direct","save","新增通知");
    rabbitTemplate.convertAndSend("amq.direct","delete","删除通知");}

2、创建消费者测试类

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue01"),
    exchange =@Exchange(name ="amq.direct", type =ExchangeTypes.DIRECT),
    key ={"save"}))// 处理新增的业务publicvoiddirectQueueListener01(String message){System.out.println("消费者01接收到direct.queue01的消息:"+ message);}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue02"),
    exchange =@Exchange(name ="amq.direct", type =ExchangeTypes.DIRECT),
    key ={"delete"}))// 处理删除的业务publicvoiddirectQueueListener02(String message){System.out.println("消费者02接收到direct.queue02的消息:"+ message);}

3、测试

运行生产者和消费者测试类
在这里插入图片描述
在这里插入图片描述

3.3.Topics通配符

Topics通配符官方文档
在这里插入图片描述
Topics 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。

不同的是 Topic类型的 Exchange 可以让队列在绑定 Routingkey 的时候使用通配符

通配符规则:

  • #:匹配一个或多个单词
  • *:匹配一个单词

举例:

  • user.#:能够匹配 user.add 或者 user.detail.add
  • user.*:只能匹配 user.add

1、创建生产者测试类

@TestpublicvoidtestTopicExchange(){
    rabbitTemplate.convertAndSend("amq.topic","user.add","新增用户通知");
    rabbitTemplate.convertAndSend("amq.topic","user.update","更新户通知");
    rabbitTemplate.convertAndSend("amq.topic","dept.add","新增部门通知");
    rabbitTemplate.convertAndSend("amq.topic","dept.update","更新部门通知");}

2、创建消费者测试类

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue01"),
    exchange =@Exchange(name ="amq.topic", type =ExchangeTypes.TOPIC),
    key ="user.*"))publicvoidtopicQueueListener01(String message){System.out.println("消费者01接收到topic.queue01的消息:"+ message);}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue02"),
    exchange =@Exchange(name ="amq.topic", type =ExchangeTypes.TOPIC),
    key ="dept.*"))publicvoidtopicQueueListener02(String message){System.out.println("消费者02接收到topic.queue02的消息:"+ message);}

3、测试

运行生产者和消费者测试类
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


本文转载自: https://blog.csdn.net/XinShou___/article/details/130752800
版权归原作者 码农小白123 所有, 如有侵权,请联系我们删除。

“中间件_RabbitMQ五种消息模型”的评论:

还没有评论