前言:
本文章将介绍RabbitMQ中的Topic(主题)模式,其中还会涉及 ‘#’ 和 ‘*’ 两个通配符在RabbitMQ中的区别。
什么是Topic模式
RabbitMQ的Topic模式是一种基于主题的消息传递模式。它允许发送者向一个特定的主题(topic)发布消息,同时,订阅者也可以针对自己感兴趣的主题进行订阅。
在Topic模式中,主题通过一个由单词和点号组成的字符串来描述。例如,“*.china.#”表示匹配所有以“china”为结尾的主题,比如“bj.china”或“shanghai.china.weather”等。( ‘ # ’ 和 ‘ * ’ 会再后面介绍)
当一个消息被发布到Topic交换机(Exchange)时,交换机会将消息转发给所有与该主题匹配的队列。消费者(即订阅者)可以对队列进行绑定,通过指定自己感兴趣的主题进行绑定。
通过使用Topic模式,我们可以实现高度灵活的信息交换模式,同时,确保只有感兴趣的消费者才会收到消息,提高了系统的效率和可靠性。
‘ # ’ 和 ‘ * ’二者的区别
在RabbitMQ的Topic模式中,符号“#”和“*”都用于匹配主题,但它们的意义是不同的。
符号“#”表示通配符可以匹配0个或者多个单词。例如,“china.#”可以匹配所有的以“china”为前缀的主题,例如“china.beijing”,“china.shanghai.weather”等等。
符号“ * ”表示通配符:可以匹配一个单词。例如,“china.*”可以匹配所有的以“china”为前缀并且后面只有一个单词的主题,例如“china.beijing”,“china.shanghai”,但是“china.shanghai.weather”不会被匹配。
总的来说,“#”更加灵活,可以匹配更多的情况,而“*”则更加具体,只能匹配一个单词。但是,使用通配符需要注意,因为它可能会匹配到不可预测的主题,可能会导致消息被传递到错误的队列或者丢失。因此,在设计主题时需要慎重考虑,并尽量减少通配符的使用。
Topic模式实操
老规矩,我们先到RabbitMQ的web管理界面去创建一个Topic的交换机
效果如下:
点击该topic_exchange,进入到下图界面,并绑定消息队列,如果队列不存在需要先创建在过来绑定
最终效果:
接下来就是代码部分了,我们需要创建一个maven项目,然后将下面的依赖导入:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--RabbitMQ依赖--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency></dependencies>
然后创建生产者,代码如下:
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* @description: Producer 简单队列生产者
*/publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("ip地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 6: 准备发送消息的内容String message ="超级无敌爱学习";String exchangeName ="topic-exchange";String routingKey1 ="pz.class.student";String routingKey2 ="class.user.student";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
channel.basicPublish(exchangeName, routingKey1,null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
创建消费者,代码如下:
importcom.rabbitmq.client.*;importjava.io.IOException;/**
* @description: Consumer
* @Date : 2021/3/2
*/publicclassConsumer{privatestaticRunnable runnable =()->{// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("43.139.42.244");
connectionFactory.setPort(5678);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");//获取队列的名称finalString queueName =Thread.currentThread().getName();Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 5: 申明队列queue存储消息/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;
finalChannel.basicConsume(queueName,true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{System.out.println(queueName +":收到消息是:"+newString(delivery.getBody(),"UTF-8"));}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println(queueName +":开始接受消息");System.in.read();}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}};publicstaticvoidmain(String[] args){// 启动三个线程去执行newThread(runnable,"queue1").start();newThread(runnable,"queue2").start();newThread(runnable,"queue3").start();}}
接下来执行生产者代码,在这段代码中,我们先对路由key1进行发送消息并通过web管理界面查看效果:
执行结果:
web管理界面查看结果:
通过上面的图,我们可以发现,我们消息通过topic_exchange这个交换机通过指定路由key发送到了绑定的消息队列中,由于routingkey使用的是通配符发方式,其中“queue2 -> #.class.* ” , “ queue3 -> #.student.#”,又由于通配符,# 号是指0个及以上,* 号是仅匹配一个,那么结果就是符合预期的,因为routingkey1= pz.class.student,class前有一个,后面有一个,会映射到queue2,student前面有多个,后面没有可以映射到queue3,结果就和图示一样啦~~
接下来就执行routekey2的路由key来看看会发生什么效果:
执行结果
web管理界面查看结果:
可以看到这次queue1和queue3,接收到了消息,可以自己尝试分析一下,这里不做过多赘述。
以上便是本章全部内容,感谢阅读ovo
如有错误,感谢指正
版权归原作者 夜莺philomel 所有, 如有侵权,请联系我们删除。