0


RabbitMQ:订阅模型-消息订阅模式

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

RabbitMQ 单生产单消费模型主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

文章目录


一、RabbitMQ 订阅模型-消息订阅(Fanout)模式

1、RabbitMQ 消息订阅(Fanout)模式

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

image-20221201012238266

2、消息订阅(Fanout)模式组成

RabbitMQ 订阅模型-消息订阅(Fanout)模式主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

3、消息订阅(Fanout)模式流程

消息订阅(Fanout)模式流程:

  1. 消息订阅(Fanout)模式 可以有多个消费者
  2. 每个消费者有自己的 queue(队列)
  3. 每个队列都要绑定到 Exchange(交换机)
  4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给那个队列,生产者无法决定
  5. 交换机把消息发送给绑定过的所有队列
  6. 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现

1、添加 Maven 依赖

在 pom.xml 文件中添加以下依赖

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency>
      ...
</dependencies>

2、封装工具类 ConnectionUtil

packagecom.lizhengi.fanout;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * @author liziheng
 * @version 1.0.0
 * @description 连接工具类封装
 * @date 2022-12-26 11:39 上午
 **/publicclassConnectionUtil{/**
     * 创建 MQ 连接工厂对象
     */privatestaticfinalConnectionFactoryCONNECTION_FACTORY;// 类加载时,重量级资源加载static{CONNECTION_FACTORY=newConnectionFactory();//设置连接MQ主机CONNECTION_FACTORY.setHost("127.0.0.1");//设置连接MQ端口号CONNECTION_FACTORY.setPort(5672);//设置连接MQ虚拟主机CONNECTION_FACTORY.setVirtualHost("/test");//设置连接MQ虚拟主机的用户名密码CONNECTION_FACTORY.setUsername("admin");CONNECTION_FACTORY.setPassword("123456");}/**
     * 建立与RabbitMQ连接 工具方法
     *
     * @return Connection
     */publicstaticConnectiongetConnection(){try{//返回连接对象returnCONNECTION_FACTORY.newConnection();}catch(Exception e){
            e.printStackTrace();}returnnull;}/**
     * 关闭通道和连接 工具方法
     *
     * @param channel    Channel
     * @param connection Connection
     */publicstaticvoidcloseConnectionAndChanel(Channel channel,Connection connection){try{if(channel !=null){
                channel.close();}if(connection !=null){
                connection.close();}}catch(Exception e){
            e.printStackTrace();}}}

3、生产者实现

packagecom.lizhengi.fanout;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importjava.io.IOException;/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 生产者
 * @date 2022-12-26 11:44 上午
 **/publicclassProducer{publicstaticvoidmain(String[] args)throwsIOException{// 获取连接对象Connection connection =ConnectionUtil.getConnection();assert connection !=null;Channel channel = connection.createChannel();/*  为通道声明交换机
         *  参数1:交换机名称;参数2:交换机类型
         */
        channel.exchangeDeclare("logs","fanout");//发送消息for(int i =0; i <100; i++){
            channel.basicPublish("logs","",null,("fanout type message"+ i).getBytes());}//释放资源ConnectionUtil.closeConnectionAndChanel(channel, connection);}}

4、消费者-1 实现

packagecom.lizhengi.fanout;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 消费者
 * @date 2022-12-26 11:45 上午
 **/publicclassCustomer1{publicstaticvoidmain(String[] args)throwsIOException{// 获取连接对象Connection connection =ConnectionUtil.getConnection();assert connection !=null;Channel channel = connection.createChannel();//  绑定交换机
        channel.exchangeDeclare("logs","fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
        channel.queueBind(queueName,"logs","");//  消费消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){System.out.println("消费者1:"+newString(body));}});}}

5、消费者-2 实现

packagecom.lizhengi.fanout;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 消费者
 * @date 2022-12-26 11:45 上午
 **/publicclassCustomer2{publicstaticvoidmain(String[] args)throwsIOException{// 获取连接对象Connection connection =ConnectionUtil.getConnection();assert connection !=null;Channel channel = connection.createChannel();//  绑定交换机
        channel.exchangeDeclare("logs","fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
        channel.queueBind(queueName,"logs","");//  消费消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){System.out.println("消费者2:"+newString(body));}});}}

6、消费者-3 实现

packagecom.lizhengi.fanout;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 消费者
 * @date 2022-12-26 11:45 上午
 **/publicclassCustomer3{publicstaticvoidmain(String[] args)throwsIOException{// 获取连接对象Connection connection =ConnectionUtil.getConnection();assert connection !=null;Channel channel = connection.createChannel();//  绑定交换机
        channel.exchangeDeclare("logs","fanout");//  为单个 Customer 创建临时队列String queueName = channel.queueDeclare().getQueue();//  绑定交换机和队列//  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
        channel.queueBind(queueName,"logs","");//  消费消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){System.out.println("消费者3:"+newString(body));}});}}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。


本文转载自: https://blog.csdn.net/weixin_45187434/article/details/128445184
版权归原作者 栗筝i 所有, 如有侵权,请联系我们删除。

“RabbitMQ:订阅模型-消息订阅模式”的评论:

还没有评论