0


【RabbitMQ六】——RabbitMQ主题模式(Topic)

RabbitMQ主题模式(通配符模式)

前言

通过本篇博客能够简单使用RabbitMQ的主题模式。
本篇博客主要是博主通过官网总结出的RabbitMQ主题模式。其中如果有误欢迎大家及时指正。

什么是Topic模式

Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行路由。也就是在队列绑定Routing key的时候使用通配符。这使我们相较于Direct模式灵活性更大。

使用Topic模式的要点

routing key必须是由"."进行分隔的单词列表,最大限制为255字节

通配符规则

  • "*"可以代替一个单词。
  • "#"可以代替零个或多个单词。

示例

创建了三个绑定:Q1绑定了绑定键“*.orange”。和Q2的"..rabbit"和“lazy.#”。
在这里插入图片描述

1.一个消息的路由键为"quick.orange.rabbit" 时,它将会被送到队列Q1和Q2。
2.一个消息的路由键为"quick.orange.fox"时,它将会背诵到队列Q1
3.一个消息的路由键为"lazy.brown.fox"时,它将被送到队列Q2
4.一个消息的路由键为"quick.brown.fox",没有匹配任何队列,消息将会丢失。
5.一个消息的路由键为"lazy.orange.new.rabbit",它将被送到队列Q2.
6.一个消息的路由键为"orang"或者"quick.orange.new.rabbit"没有匹配到任何队列消息将丢失。

代码示例

Pom文件引入RabbtiMQ依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>

RabbitMQ工具类

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : RabbitMQUtils
 * @description : [rabbitmq工具类]
 * @createTime : [2023/1/17 8:49]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:49]
 * @updateRemark : [描述说明本次修改内容]
 */publicclassRabbitMQUtils{/*
     * @version V1.0
     * Title: getConnection
     * @author Wangwei
     * @description 创建rabbitmq连接
     * @createTime  2023/1/17 8:52
     * @param []
     * @return com.rabbitmq.client.Connection
     */publicstaticConnectiongetConnection()throwsIOException,TimeoutException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setVirtualHost("虚拟主机");
        factory.setUsername("用户名");
        factory.setPassword("密码");//创建连接Connection connection=factory.newConnection();return  connection;}/*
     * @version V1.0
     * Title: getChannel
     * @author Wangwei
     * @description 创建信道
     * @createTime  2023/1/17 8:55
     * @param []
     * @return com.rabbitmq.client.Channel
     */publicstaticChannelgetChannel()throwsIOException,TimeoutException{Connection connection=getConnection();Channel channel=connection.createChannel();return channel;}}

生产者

importcom.rabbitmq.client.Channel;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.TimeoutException;/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */publicclassProducer{privatestaticfinalStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//建立连接RabbitMQUtils.getConnection();//声明通道Channel channel =RabbitMQUtils.getChannel();//创建topic类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");//声明routingKeyString severityInfo="info.log.test";String severityError="error.test";String severityError2="log.error.test";//循环发送2条消息for(int i =0; i <2; i++){String msg="info.log.test:"+i;/*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for(int i =0; i <2; i++){String msg="主题模式error.test:"+i;/*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}//循环发送2条消息for(int i =0; i <2; i++){String msg="log.error.test:"+i;/*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError2,null,msg.getBytes(StandardCharsets.UTF_8));System.out.println(msg);}}}

消费者1

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/2/1 9:39]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:39]
 * @updateRemark : [描述说明本次修改内容]
 */publicclassConsumerOne{privatestaticfinalStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{RabbitMQUtils.getConnection();Channel channel =RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"topic");String queueName = channel.queueDeclare().getQueue();//声明routingKey (error)String severityError="error.*";//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失//queueName绑定了direct_logs交换机并且绑定了routingKey
        channel.queueBind(queueName,EXCHANGE_NAME,severityError );//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");System.out.println(" [x] Received '"+ message +"'");};
        channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}

消费者2

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importjava.io.IOException;importjava.util.concurrent.TimeoutException;/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */publicclassConsumerTwo{privatestaticfinalStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{RabbitMQUtils.getConnection();Channel channel =RabbitMQUtils.getChannel();//创建fanout类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");//创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称String queueName = channel.queueDeclare().getQueue();//声明routingKey (info,error,warning)String severityInfo="info.#";String severityError="*.error.*";//交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失//queueName绑定了direct_logs交换机并且绑定了3个routingKey
        channel.queueBind(queueName,EXCHANGE_NAME,severityInfo );
        channel.queueBind(queueName,EXCHANGE_NAME,severityError );//因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");System.out.println(" [x] Received '"+ message +"'");};
        channel.basicConsume(queueName,true, deliverCallback, consumerTag ->{});}}

效果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

总结

通过使用通配符实现灵活性的应用有很多,例如nginx的请求转发,gateway为请求过滤等等都是使用了统配符的技术。通过这种联想来对知识进行结构化,找相同和不同,思考能力和学习力也会有很大的提高。


本文转载自: https://blog.csdn.net/wangwei021933/article/details/129097400
版权归原作者 王卫——David 所有, 如有侵权,请联系我们删除。

“【RabbitMQ六】——RabbitMQ主题模式(Topic)”的评论:

还没有评论