前言
本文章主要介绍RabbitMQ的发布订阅模式,该模式下,消息为广播形式,一经发布则会进入交换机绑定的队列中,详细介绍可以阅读官方文档。
什么是发布与订阅模式
RabbitMQ中的发布与订阅模式是一种消息传递的方式,用于在分布式系统中传递消息。
在该模式中,发送者(发布者)通过将消息发送到一个称为Exchange(交换机)的组件,消息将被路由到一个或多个称为Queue(队列)的组件。每个队列都有一个名称和一组绑定(bindings),指定接收哪些消息。消费者(订阅者)可以在指定的队列上进行侦听,以获取消息。
该模式的一个重要特点是支持多个消费者接收相同的消息,通过使用多个队列和绑定实现。这种方式进一步提高了系统的可伸缩性和健壮性,从而能够满足大规模分布式系统对高效消息传递的需求。
代码实操
在实操之前,我们可以先到RabbitMQ的管理界面先定义好该模式的交换机与绑定交换机的消息队列。
定义交换机如下图:
然后点击All Exchange查看是否创建成功
点击定义好的交换机fanout_exchange,会出现下图中的界面:
这时我们就可以进行绑定队列了,下面是字段解释
在配置好queue消息后,点击绑定即可。
需要注意的是,我们绑定的队列是必须在Queue模块已经创建好的对列,不然是无法绑定的,也就是说,交换机和队列是两个分离开的模块,Exchange只负责将信息送到Queue中,而且Queue负责消息的存储和消费。
到这里交换机的工作就准备好了,接下来就是创建项目和导入依赖了。要使用RabbitMQ我们可以导入下面的这个依赖
<!--RabbitMQ依赖--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>
生产者代码
publicclassProducer{publicstaticvoidmain(String[] args){// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("IP地址");
connectionFactory.setPort(端口号);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 6: 准备发送消息的内容String message ="宇宙无敌爱学习!!!";String exchangeName ="fanout_exchange";String routingKey ="";// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容
channel.basicPublish(exchangeName, routingKey,null, message.getBytes());System.out.println("消息发送成功!");}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}}}
执行代码效果如下:
我们到管理界面查看是否有消息被写入了,结果如下:
发现在这个模式下,三个队列都被放入了同一个消息,这就是所谓的发布与订阅,类似广播,一经发布就全部人都收到。
接下来我执行生产者代码,生产者代码使用了线程来进行处理,代码如下:
publicclassConsumer{privatestaticRunnable runnable =()->{// 1: 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();// 2: 设置连接属性
connectionFactory.setHost("IP地址");
connectionFactory.setPort(端口号);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");//获取队列的名称finalString queueName =Thread.currentThread().getName();Connection connection =null;Channel channel =null;try{// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channel
channel = connection.createChannel();// 5: 申明队列queue存储消息/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, false, false, null);// 6: 定义接受消息的回调Channel finalChannel = channel;
finalChannel.basicConsume(queueName,true,newDeliverCallback(){@Overridepublicvoidhandle(String s,Delivery delivery)throwsIOException{System.out.println(queueName +":收到消息是:"+newString(delivery.getBody(),"UTF-8"));}},newCancelCallback(){@Overridepublicvoidhandle(String s)throwsIOException{}});System.out.println(queueName +":开始接受消息");System.in.read();}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}finally{// 7: 释放连接关闭通道if(channel !=null&& channel.isOpen()){try{
channel.close();}catch(Exception ex){
ex.printStackTrace();}}if(connection !=null&& connection.isOpen()){try{
connection.close();}catch(Exception ex){
ex.printStackTrace();}}}};publicstaticvoidmain(String[] args){// 启动三个线程去执行newThread(runnable,"queue1").start();newThread(runnable,"queue2").start();newThread(runnable,"queue3").start();}}
结果如下:
可以看出,消息被消费掉了。
以上便是发布与订阅的全部内容,仅作为个学习笔记使用
感谢观看
版权归原作者 夜莺philomel 所有, 如有侵权,请联系我们删除。