概念
RabbitMQ 是一种流行的开源消息代理(Message Broker)软件,它实现了高级消息队列协议(AMQP - Advanced Message Queuing Protocol)。RabbitMQ 通过高效的消息传递机制,主要应用于分布式系统中解耦应用组件、异步消息发送、流量削峰等场景,可提高系统扩展性和稳定性。
RabbitMQ 的核心功能
- 消息队列 RabbitMQ 提供消息队列功能,用于存储和转发消息,确保生产者和消费者之间的解耦。
- 可靠性 RabbitMQ 支持消息持久化、确认机制和高可用性集群,保证消息在传递过程中的可靠性。
- 灵活的路由 借助交换机(Exchange)机制,RabbitMQ 支持灵活的消息路由规则,包括广播(fanout)、直连(direct)、主题(topic)等。
- 消息确认机制 消费者需要确认消息已被成功消费,未确认的消息可以重新投递(避免消息丢失)。
- 扩展性和高可用性 RabbitMQ 支持集群部署,可根据负载动态扩展,同时提供镜像队列功能,实现高可用性。
- 插件机制 RabbitMQ 支持丰富的插件,用于监控、身份验证、消息追踪等扩展功能。
RabbitMQ 的核心概念
- 消息(Message): RabbitMQ 处理的最小单位,包含消息头和消息体。 消息头描述消息的属性(如优先级、过期时间等),消息体是实际的数据内容。
- 生产者(Producer): 发送消息到 RabbitMQ 的应用程序。
- 消费者(Consumer): 从 RabbitMQ 中接收并处理消息的应用程序。
- 队列(Queue): 存储消息的容器,遵循先进先出(FIFO)规则。消息只能存储到队列中,消费者从队列中取出消息进行处理。
- 交换机(Exchange): 用于接收生产者发送的消息,并根据绑定规则将消息路由到队列。 常见交换机类型:
direct
:根据精确匹配的路由键转发消息。fanout
:将消息广播到所有绑定的队列。topic
:按模式匹配的路由键转发消息。headers
:根据消息头的属性匹配路由。 - 绑定(Binding): 交换机与队列之间的关系,指定消息如何从交换机路由到队列。
- 路由键(Routing Key): 用于匹配交换机和队列的绑定规则。
- 虚拟主机(Virtual Host,vhost): 类似于一个命名空间,用于隔离队列、交换机等资源。
- 连接和通道(Connection & Channel): 生产者和消费者通过连接与 RabbitMQ 交互,每个连接可包含多个通道,通道是实际读写消息的通信路径。
- ACK 确认机制: 生产者和消费者可确认消息是否成功投递或处理。确认机制分为:
生产者确认
:确保消息发送到队列。消费者确认
:确保消息成功处理。
RabbitMQ 的工作流程
- 生产者发送消息到交换机 生产者通过指定交换机和路由键发送消息。
- 交换机将消息路由到队列 根据绑定规则,交换机将消息路由到一个或多个队列。
- 消费者从队列接收消息 消费者监听队列,并从队列中取出消息进行处理。
- 消息确认 消费者处理完成后,向 RabbitMQ 发送确认,RabbitMQ 删除该消息。若消费者未确认,RabbitMQ 可重新投递消息。
RabbitMQ 的应用场景
- 解耦微服务 RabbitMQ 在分布式架构中充当消息桥梁,避免服务之间的直接依赖。
- 异步任务处理 将耗时的任务放入队列,消费者后台处理,提升系统响应速度。
- 日志收集 使用 RabbitMQ 作为日志消息的中间件,集中处理和分析日志数据。
- 分布式系统的负载均衡 RabbitMQ 可将消息分发给多个消费者,实现任务的均衡处理。
- 实时消息推送 支持高并发的实时消息推送场景,如在线聊天、通知系统。
RabbitMQ 的优缺点
优点:
- 支持多种协议(AMQP、STOMP、MQTT 等)。
- 功能强大,支持复杂的消息路由。
- 高可靠性,支持持久化和集群模式。
- 插件机制丰富,便于扩展。
- 广泛支持多种编程语言(Java、Python、Go 等)。
缺点:
- 性能较 ActiveMQ、Kafka 稍低,不适合大数据流场景。
- 配置复杂,需要一定的学习成本。
- 占用资源较高,尤其是大量队列和消息积压时。
RabbitMQ 是一个功能强大、易用的消息中间件,适合需要可靠消息传递、灵活路由和高可用性的场景。通过其简单直观的架构,开发者可以轻松实现消息解耦、异步处理和分布式通信功能,从而大大提高系统的可扩展性和可靠性。
接下来通过java代码展示其简单应用。关于rabbitmq服务的安装,请参考linux安装rabbitmq
创建连接
创建一个maven项目,在
pom
添加如下依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.21.0</version></dependency><!--
导入slf4j相关,为解决控制台出现如下信息:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version><!-- 或使用其他版本 --></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.6</version><!-- 或使用其他版本 --></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.2.6</version></dependency>
使用rabbitmq的连接工厂,来创建对
rabbitmq-server
的连接:
importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 连接工具,建立与RabbitMQ服务的连接
*
*/publicclassConnectionUtil{publicstaticConnectiongetConnection()throwsException{//定义连接工厂ConnectionFactory factory =newConnectionFactory();//设置服务地址,也就是安装rabbitmq的服务器ip
factory.setHost("192.168.137.200");//端口
factory.setPort(5672);//设置虚拟机名称,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq,这里使用默认虚拟机
factory.setVirtualHost("/");//设置用户名
factory.setUsername("admin");//设置密码
factory.setPassword("admin");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}}
虚拟机信息在管理控制台页面中如下
在
Admin
页签的
Virtual Host
一栏中,
Name
即为虚拟机名字(‘’/" 为rabbitmq的默认虚拟机),
Users
为该虚拟机中的用户:
想要添加新的虚拟机,可以通过上图中的
Add virtual host
按钮进行添加
用户信息在管理控制台页面中如下
在
Admin
页签的
Users
一栏中,想要添加新的用户,可以通过下图中的
Add user
按钮进行添加,用户可分配的权限为Admin、Monitoring、Policymaker、Management、Impersonator、None:
Admin (管理员权限)
赋予用户完全的管理权限,可以执行几乎所有操作,适用于需要对 RabbitMQ 系统进行全面管理和配置的用户。
包括的权限:
- 创建、删除、管理队列、交换机、绑定和虚拟主机。
- 配置和管理用户权限和角色。
- 配置 RabbitMQ 集群、插件和策略。
- 查看和修改 RabbitMQ 的所有设置。
Monitoring (监控权限)
允许用户查看 RabbitMQ 的监控信息,但不能进行任何修改操作。适用于需要查看 RabbitMQ 系统运行状况、但不需要做出修改的用户(如运维人员、监控人员)
包括的权限:
- 查看队列、交换机、连接、通道的状态信息。
- 查看消息流、消息队列的深度和消费者等监控数据。
- 查看系统的资源使用情况(如内存、磁盘、CPU 使用等)。
Policymaker (策略管理权限)
允许用户管理 RabbitMQ 中的策略,适用于负责 RabbitMQ 策略配置(如队列策略、镜像策略等)的用户。
包括的权限:
- 创建、删除和修改虚拟主机的策略。
- 配置消息队列的生命周期、镜像策略、磁盘空间限制等。
- 不允许进行其他管理操作,如修改队列、交换机、绑定等。
Management (管理界面权限)
允许用户访问和使用 RabbitMQ 的管理控制台,查看系统状态和配置信息,但不包括修改操作。适用于需要监控和查看 RabbitMQ 系统状态,但不需要对系统做修改的用户。
包括的权限:
- 访问 RabbitMQ 的管理界面。
- 查看管理控制台的所有信息(如队列、交换机、连接、用户等)。
- 不允许执行创建、删除、修改等操作。
Impersonator (伪装权限)
允许用户以其他用户的身份执行操作,但不具有实际的权限修改能力。适用于需要代替其他用户执行操作或进行调试的用户。
包括的权限:
- 可以 "伪装" 成为其他用户,从而以该用户的权限来执行操作。通常,用于临时授予某些操作权限。
- 这种权限通常用于管理审计或系统调试。
None (无权限)
此权限不授予任何权限,适用于不需要访问 RabbitMQ 系统的用户,或者是仅用作某些临时操作的用户。
包括的权限:
- 不允许用户访问管理页面,执行任何操作。
- 该用户几乎不具有任何权限,不能进行查看或修改操作。
简单模式
RabbitMQ 的简单模式(Simple模式) 是消息队列的一种基本模式,该模式对应一个生产者与一个消费者。在简单模式下,消息生产者将消息发送到队列中,然后由消费者从队列中取出消息进行处理。
简单模式的基本概念
- 生产者(Producer):负责发送消息的应用程序或服务。生产者将消息发送到指定的队列中。
- 队列(Queue):消息的存储区域。队列在 RabbitMQ 服务器上,生产者将消息发送到队列,消费者从队列中获取消息。
- 消费者(Consumer):负责接收和处理消息的应用程序或服务。消费者从队列中获取消息进行处理。
- RabbitMQ 服务器:负责管理队列并协调消息的发送和接收。
简单模式的工作流程如下:
- 生产者连接到 RabbitMQ 服务器。
- 创建通道:生产者通过连接创建一个通道,用于声明队列及发布消息。
- 声明队列:生产者在发送消息之前,会利用通道声明一个队列。声明队列的作用是确保队列存在,如果队列不存在会创建队列;如果队列已存在,则跳过创建。
- 生产者发送消息到队列:生产者使用
basicPublish
方法将消息发送到指定的队列中。 - 消费者连接到 RabbitMQ 服务器。
- 消费者从队列中获取消息:消费者从指定队列中接收消息,进行消费处理。
- 确认消息(ACK):在消费完成后,消费者会发送确认信号(ACK),告知 RabbitMQ 消息已经处理完毕。这样 RabbitMQ 可以将消息从队列中删除。ACK可设置自动回复或手动回复。
生产者代码
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassSimpleProducer{//队列名称privatefinalstaticString QUEUE_NAME ="simple_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){/**
* 声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);// 消息内容String message ="Hello World";/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME,null, message.getBytes());}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者代码
消费者连接rabbitmq后,依然需要声明队列,因为需要确保队列的存在
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassSimpleConsumer{//要与生产者的队列名保持一致privatefinalstaticString QUEUE_NAME ="simple_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先运行生产者主方法,运行后查看管理页面的
Queues and Streams
页签:
红框
Messages
部分即代表队列中的消息,
Ready
、
Unacked
和
Total
的含义如下:
- Ready:表示队列中已经准备好可以被消费者消费的消息数量。即这些消息还没有被任何消费者接收。
- Unacked(未确认的消息):表示已经被消费者接收但还没有被确认(acknowledged)的消息数量。这意味着这些消息被消费者消费后,还未发送确认,因此 RabbitMQ 会等待消费者确认消息处理完成。如果消费者断开连接或未确认,RabbitMQ 会将这些消息重新放回队列中,以便被其他消费者重新消费。
- Total:表示队列中的消息总数,是
Ready
和Unacked
两者的总和。
图中:
Ready
为 1,表示有 1 条消息在队列中等待被消费。Unacked
为 0,表示没有消息被消费者接收且未确认。Total
为 1,表示队列中的消息总数为 1。
再运行消费者主方法,运行后查看管理页面的
Queues and Streams
页签:
红框
Messages
部分都为0,代表消息都已被消费
消费者代码控制台输出:
接收消息: Hello World!
工作模式
在 RabbitMQ 的工作队列模式(Work Queue / Task Queue)中,一个生产者会对应多个消费者。
消息分发给多个消费者的方式主要有两种:轮询分配 和 公平分配。
轮询分配(Round-robin Dispatching)
原理:
- 默认情况下,RabbitMQ 将消息以 轮询的方式 均匀地分发给所有消费者,消息的分配模式是一个消费者分配一条,直至消息消费完成。
- 每个消费者都会轮流收到消息,而不会考虑消费者当前的工作负载。
特点:
- 无视消费者处理能力: RabbitMQ 不会关心某个消费者是否已经忙碌或是否处理得更快,而是严格地轮流发送消息。
- 简单高效: 实现方式简单,但在消费者性能不均的情况下,可能导致某些消费者负载过高或过低。
公平分配(Fair Dispatching)
原理:
- 公平分配遵循能者多劳的原则,核心是基于 消费者的繁忙程度 分发消息。
- RabbitMQ 通过 消息确认(ACK)机制 来检测消费者是否空闲。
- 如果消费者在当前未完成上一个任务,则不会分配新的任务给该消费者。
特点:
- 消费者负载感知: RabbitMQ 根据消费者的负载情况分发消息,而不是简单地轮流发送,这种方式确保消息只发送到空闲的消费者,避免让忙碌的消费者承担额外的负担。
- 消息确认机制(ACK):消费者需要显式地向 RabbitMQ 确认(ACK)已成功处理一条消息。未确认的消息(比如因消费者挂掉或处理时间过长)会重新投递到其他消费者,确保消息不会丢失。
- 基于 QoS 的限流控制:使用
basicQos
参数(如basicQos(1)
)限制 RabbitMQ 在未收到消费者确认时不发送新的消息。
轮询发送
创建一个生产者,两个消费者来演示轮询发送效果:
- 每个消费者都添加
channel.basicQos(1)
,来保证每次只接收一条消息,用于演示轮询效果- 消费者一添加线程阻塞1秒,消费者二不添加,观察二者得到的消息数量是否一致
生产者代码
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassWorkProducer{//队列名称privatefinalstaticString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){/**
* 声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//循环发送10个消息for(int i =0; i <10; i++){// 消息内容String message ="工作模式消息-Hello World-"+ i;/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,工作模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,工作模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME,null, message.getBytes());}}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassWorkConsumerOne{//要与生产者的队列名保持一致privatefinalstaticString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("工作模式消费者1接收消息: "+ msg +"!");try{Thread.sleep(1000);}catch(InterruptedException e){
e.printStackTrace();}}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassWorkConsumerTwo{//要与生产者的队列名保持一致privatefinalstaticString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("工作模式消费者2接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,最后启动生产者
消费者一控制台输出
工作模式消费者1接收消息: 工作模式消息-Hello World-0!
工作模式消费者1接收消息: 工作模式消息-Hello World-2!
工作模式消费者1接收消息: 工作模式消息-Hello World-4!
工作模式消费者1接收消息: 工作模式消息-Hello World-6!
工作模式消费者1接收消息: 工作模式消息-Hello World-8!
消费者二控制台输出
工作模式消费者2接收消息: 工作模式消息-Hello World-1!
工作模式消费者2接收消息: 工作模式消息-Hello World-3!
工作模式消费者2接收消息: 工作模式消息-Hello World-5!
工作模式消费者2接收消息: 工作模式消息-Hello World-7!
工作模式消费者2接收消息: 工作模式消息-Hello World-9!
这里看到,生产者一共发送了10条消息到队列中,即便消费者一添加了线程阻塞方法来延缓执行,两个消费者接收到的消息数量依然相同。
公平分发
启用公平分配的设置:
要实现公平分配,需要修改以下两个参数:
basicQos
参数:- 用于限制 RabbitMQ 在消费者未确认消息时,不会发送新的消息。- 设置为basicQos(1)
表示每次只分发一条消息,消费者处理并确认(ACK)后,才会继续分发下一条消息。- 消息确认(Manual ACK):- 需要将消费者改为 手动确认模式。- 当消费者处理完消息后,手动发送一个 ACK 来告诉 RabbitMQ,消息已经处理完成。- 如果消息没有确认(如消费者挂掉),RabbitMQ 会重新将消息发送给其他消费者。
创建一个生产者,两个消费者来演示轮询发送效果:
- 每个消费者都添加
channel.basicQos(1)
,来保证每次只接收一条消息,用于演示轮询效果- 每个消费者都添加
channel.basicAck(envelope.getDeliveryTag(),false)
实现手动确认- 消费者一添加线程阻塞1秒,消费者二不添加,观察二者得到的消息数量
生产者代码
生产者代码与轮询模式的生产者相同
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassWorkProducer{//队列名称privatefinalstaticString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){/**
* 声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//循环发送10个消息for(int i =0; i <10; i++){// 消息内容String message ="工作模式消息-Hello World-"+ i;/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,工作模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,工作模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME,null, message.getBytes());}}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
在消费方法中加入
Thread.sleep(1000)
,让消费者一的消息处理变慢。
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassPubWorkConsumerOne{//要与生产者的队列名保持一致privatefinalstaticString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("工作模式消费者1接收消息: "+ msg +"!");//手动返回ack
channel.basicAck(envelope.getDeliveryTag(),false);try{Thread.sleep(1000);}catch(InterruptedException e){
e.printStackTrace();}}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME,false, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
无线程阻塞,正常处理消息,以观察处理结果
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassPubWorkConsumerTwo{//要与生产者的队列名保持一致privatefinalstaticString QUEUE_NAME ="work_queue";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("工作模式消费者2接收消息: "+ msg +"!");//手动返回ack
channel.basicAck(envelope.getDeliveryTag(),false);}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME,false, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,最后启动生产者
消费者一
工作模式消费者1接收消息: 工作模式消息-Hello World-1!
工作模式消费者1接收消息: 工作模式消息-Hello World-2!
消费者二
工作模式消费者2接收消息: 工作模式消息-Hello World-0!
工作模式消费者2接收消息: 工作模式消息-Hello World-3!
工作模式消费者2接收消息: 工作模式消息-Hello World-4!
工作模式消费者2接收消息: 工作模式消息-Hello World-5!
工作模式消费者2接收消息: 工作模式消息-Hello World-6!
工作模式消费者2接收消息: 工作模式消息-Hello World-7!
工作模式消费者2接收消息: 工作模式消息-Hello World-8!
工作模式消费者2接收消息: 工作模式消息-Hello World-9!
由于消费者一中存在线程阻塞,消费者二没有,消费者二处理更快。所以根据能者多劳原则,消费者二会处理更多的消息。
两者对比
轮询与公平分发对比
特性轮询分配公平分配****分配机制严格轮流,无视消费者负载根据消费者工作量分配消息确认机制可选(默认自动 ACK)必须手动确认适用场景消费者处理能力相当的场景消费者处理能力差异大的场景优点实现简单,消息均匀分配分配更智能,避免过载缺点消费者容易过载或空闲稍微复杂,需要手动 ACK
- 轮询分配 适用于简单任务,且消费者负载相近的场景。
- 公平分配 适用于任务复杂度不同、消费者能力差异较大的场景,是生产中更常见的做法,因为它可以更好地利用系统资源并避免消息堆积。
广播模式
RabbitMQ 的 广播模式 是一种特殊的消息分发模式,使用 Fanout Exchange(扇形交换机) 实现。它可以将消息广播到所有绑定到该交换机的队列中,所有消费者都会接收到消息。
注意:广播模式下是一个消费者对应一个队列(如上图),并通过一个交换机将消息分发给多个绑定的队列来实现广播
工作机制
- 交换机类型:
fanout
(扇形交换机)。 - 消息分发规则: - 扇形交换机忽略路由键(Routing Key),不关心消息的具体内容。- 绑定到交换机的所有队列都能接收到消息,进而将消息分发给队列的消费者,无论绑定时是否指定了路由键。
- 消息流程: - 生产者:发送消息到 Fanout Exchange。- 交换机:将消息复制并广播到绑定的所有队列中。- 消费者:从对应队列中获取消息进行消费。
特性
- 无条件广播:所有绑定到交换机的队列都能接收消息,队列对应的消费者都会消费消息。
- 路由键无效:Fanout Exchange 不会检查或使用路由键。
- 动态绑定:队列可以在交换机创建后动态绑定或解绑。
场景举例
- 群发通知:多个消费者需要同时收到一条通知,比如发布新闻、推送更新等。
- 日志处理:多个系统模块需要接收相同的日志信息以进行分析或处理。
- 实时监控:比如系统状态的实时监控,需要广播到多个模块进行处理。
优缺点
优点
- 高效广播:生产者只需发送一次消息,交换机负责广播,降低了生产者的复杂性。
- 解耦设计:生产者无需知道消费者的具体信息,消费者动态绑定队列即可。
- 支持多消费者:一个消息可以被多个消费者消费。
缺点
- 所有绑定队列都接收消息:无选择性,可能导致某些消费者接收到不需要的消息。
- 消息积压风险:如果某个队列的消费者处理速度较慢,可能导致队列堆积。
特点总结
- Fanout Exchange 忽略路由键,直接广播消息。
- 消息广播给所有绑定队列,支持多个消费者消费相同消息。
- 常用于群发通知、日志处理等场景。
生产者
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassFanoutProducer{//交换机名称privatefinalstaticString EXCHANGE_NAME ="fanout_exchange";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 fanout-广播类型
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 消息内容String message ="广播模式消息-Hello World";/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播
* 2.第二个参数是routingKey,即路由键,广播模式下不使用路由键,会把消息发布给所有绑定交换机的队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes());}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassFanoutConsumerOne{//交换机名称privatefinalstaticString EXCHANGE_NAME ="fanout_exchange";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 fanout-广播类型
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明临时队列String queueName = channel.queueDeclare().getQueue();/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、路由键
*
* */
channel.queueBind(queueName,EXCHANGE_NAME,"");//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("广播模式消费者1接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(queueName,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
运行消费者一后查看管理界面,会多一个临时队列:
消费者二
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassFanoutConsumerTwo{//交换机名称privatefinalstaticString EXCHANGE_NAME ="fanout_exchange";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 fanout-广播类型
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明临时队列临时队列String queueName = channel.queueDeclare().getQueue();/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、路由键
*
* */
channel.queueBind(queueName,EXCHANGE_NAME,"");//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("广播模式消费者2接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(queueName,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
运行消费者二后查看管理界面,也会多一个临时队列:
测试
先启动两个消费者,再启动生产者,可以看到两个消费者都得到了消息
消费者一控制台输出:
广播模式消费者1接收消息: 广播模式消息-Hello World!
消费者二控制台输出:
广播模式消费者2接收消息: 广播模式消息-Hello World!
此时去管理界面查看交换机,多了新创建的
fanout_exchange
:
结束两个消费者的运行后,临时队列消失:
Direct模式
RabbitMQ 的 Direct 模式 是最常用的消息路由模式之一,适用于精确匹配路由键的场景。在 Direct 模式下,队列会通过路由键与交换机进行绑定。发布消息时,需要指定路由键进行发布,交换机会将消息发送到与该路由键精确匹配的队列。
注意: Direct 模式中一个队列对应一个消费者,交换机通过路由键将消息发布到不同的队列中由消费者消费
Direct 模式的特点
- 精确匹配: 消息发布的路由键必须与队列绑定的路由键完全一致,消息才能被路由到该队列。 不支持模糊匹配。
- 消息定向投递: 用于发送消息到特定的队列,实现消息的点对点投递。 如果消息发布使用的路由键没有任何对应绑定的队列,消息会被丢弃(除非使用备用交换机)。
- 支持多个队列绑定: 多个队列可以使用相同的路由键绑定到同一个交换机,消息会同时发送到所有匹配的队列。
Direct 模式的核心概念
- 路由键(Routing Key):消息发送时指定的字符串,用于指示消息的目标。是 Direct 模式中消息路由的唯一依据。
- 交换机(Exchange):Direct 模式使用
direct
类型的交换机。 - 队列(Queue):消息最终被路由到的存储位置,消费者从队列中获取消息进行处理。
Direct 模式的工作原理
- 创建交换机和队列:生产者创建一个类型为
direct
的交换机,并创建需要的队列。 - 绑定队列:队列通过路由键绑定到交换机。
- 发送消息:生产者发送消息时指定路由键。
- 路由消息:交换机会根据消息的路由键将消息路由到对应绑定的队列。
Direct 模式的应用场景
- 任务分发:将不同类型的任务发送到不同的队列,由专门的消费者处理。
- 日志系统:按日志级别(如
info
、error
、debug
)发送消息到不同的队列。 - 定向通知:给特定用户或特定服务发送消息。
Direct 模式的优缺点
优点:
- 简单易用,逻辑清晰。
- 精确匹配路由键,适合点对点场景。
- 高效且易于维护。
缺点:
- 灵活性相对较低,仅支持精确匹配。
- 无法实现广播或模糊匹配场景(需要结合其他模式)。
生产者
声明交换机后,通过两个路由键来发布不同的消息
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassDirectProducer{//交换机名称privatefinalstaticString EXCHANGE_NAME ="direct_exchange";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 direct-路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"direct");//创建两个路由键String routingkey1 ="info";String routingkey2 ="error";/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播
* 2.第二个参数是routingKey,即路由键,direct模式下交换机会通过路由键发布消息,只有通过该路由键绑定到交换机的队列才会接收到消息
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
*
* 这里使用两个路由键来发布两个消息
* */
channel.basicPublish(EXCHANGE_NAME,routingkey1,null,"路由模式消息-info".getBytes());
channel.basicPublish(EXCHANGE_NAME,routingkey2,null,"路由模式消息-error".getBytes());}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
声明两个队列,通过与生产者相同的路由键来绑定生产者的交换机,来接收生产者消息
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassDirectConsumerOne{//交换机名称privatefinalstaticString EXCHANGE_NAME ="direct_exchange";//用于接收info路由键所发布消息的队列privatefinalstaticString QUEUE_INFO ="queue_info";//用于接收error路由键所发布消息的队列privatefinalstaticString QUEUE_ERROR ="queue_error";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 direct-路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"direct");// 声明队列
channel.queueDeclare(QUEUE_INFO,true,false,false,null);
channel.queueDeclare(QUEUE_ERROR,true,false,false,null);/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* 这里使用不同的路由键,同时将两个队列绑定到交换机
* */
channel.queueBind(QUEUE_INFO, EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME,"error");//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("路由模式消费者1接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(QUEUE_INFO,true, consumer);
channel.basicConsume(QUEUE_ERROR,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
声明一个队列,使用生产者不存在的路由键来绑定生产者的交换机,观察是否会收到生产者发布的消息
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassDirectConsumerTwo{//交换机名称privatefinalstaticString EXCHANGE_NAME ="direct_exchange";//队列privatefinalstaticString QUEUE_OTHER ="queue_other";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 direct-路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"direct");//声明队列
channel.queueDeclare(QUEUE_OTHER,true,false,false,null);/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* */
channel.queueBind(QUEUE_OTHER, EXCHANGE_NAME,"other");//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("路由模式消费者2接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_OTHER,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,再启动生产者,效果为消费者一控制台有输出,消费者二控制台无输出。
消费者一控制台输出如下:
路由模式消费者1接收消息: 路由模式消息-info!
路由模式消费者1接收消息: 路由模式消息-error!
因为direct路由模式根据路由键来进行精确匹配,生产者并没有用与消费者二相同的路由键发布消息,所以消费者二没有收到任何消息。
下图是路由模式创建的交换机及队列
Topic模式
RabbitMQ 的 Topic 模式 是一种基于主题路由的消息模式,允许使用路由键进行模糊匹配来发布消息。相比 Direct 模式,Topic 模式提供了更灵活的消息路由机制,适用于更复杂的场景。
Topic 模式的核心特点
- 模糊匹配:- 消息的路由键可以是一个点分隔的字符串(如
order.created.us
),通过绑定键中的通配符来实现模糊匹配。 - 支持通配符:-
*
:匹配一个单词(由点.
分隔)。-#
:匹配零个或多个单词。 - 灵活性高:- 消息可以根据多级主题(如区域、服务类型、操作类型等)进行分类和路由。
- 广播与定向的结合:- 可以实现精确匹配(类似 Direct 模式)或主题广播(类似 Fanout 模式)。
Topic 模式的核心概念
- 交换机(Exchange):Topic 模式使用
topic
类型的交换机。 - 路由键(Routing Key):消息发送、绑定交换机与队列时指定的键,通常是点分隔的多级字符串(如
log.info
、order.created.us
)。 - 队列(Queue):接收和存储消息。
Topic 模式的工作原理
- 生产者发送消息:生产者向
topic
类型的交换机发送消息,并指定路由键。 - 队列绑定规则:队列通过路由键与交换机绑定,路由键可以使用通配符来定义匹配规则。
- 交换机路由消息:交换机会根据消息的路由键与队列进行匹配,将符合条件的消息发送到对应队列。
Topic 模式的通配符规则
- **
*
(星号)**:- 匹配一个单词(由.
分隔)。- 例如: - 消息路由键:log.info
- 队列绑定路由键:log.*
- 匹配成功。 - **
#
(井号)**:- 匹配零个或多个单词。- 例如: - 路由键:order.created.us
- 队列绑定路由键:order.#
- 匹配成功。
Topic 模式的应用场景
- 日志系统:按照日志的类别(如
info
、error
、warning
)或模块(如auth
、order
)路由消息。 - 分布式任务:根据任务的类型或区域分发任务(如
order.created.us
)。 - 通知系统:按照不同主题(如用户通知、系统警报)发送消息。
Topic 模式的优缺点
优点:
- 支持复杂的消息路由规则。
- 灵活性高,适合动态场景。
- 支持广播和定向投递的结合。
缺点:
- 配置复杂度略高。
- 对通配符匹配的性能要求高,可能影响路由效率。
生产者
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassTopicProducer{//交换机名称privatefinalstaticString EXCHANGE_NAME ="topic_exchange";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 topic-模糊匹配路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"topic");//创建两个路由键String routingkey1 ="message.info.one";String routingkey2 ="message.error.one";/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播
* 2.第二个参数是routingKey,即路由键,topic模式下交换机会通过路由键发布消息,队列绑定时可通过模糊匹配路由键来接收消息
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
*
* 这里使用两个路由键来发布两个消息
* */
channel.basicPublish(EXCHANGE_NAME,routingkey1,null,"topic模式消息-info".getBytes());
channel.basicPublish(EXCHANGE_NAME,routingkey2,null,"topic模式消息-error".getBytes());}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者一
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassTopicConsumerOne{//交换机名称privatefinalstaticString EXCHANGE_NAME ="topic_exchange";//用于接收info路由键所发布消息的队列privatefinalstaticString QUEUE_INFO ="topic_queue_info";//用于接收error路由键所发布消息的队列privatefinalstaticString QUEUE_ERROR ="topic_queue_error";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 topic-模糊匹配路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"topic");// 声明队列
channel.queueDeclare(QUEUE_INFO,true,false,false,null);
channel.queueDeclare(QUEUE_ERROR,true,false,false,null);/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* 这里使用不同的路由键,同时将两个队列绑定到交换机
* *//**
* 使用通配符路由键来绑定交换机与队列:
*
* 通配符
* * (star) can substitute for exactly one word. 匹配不多不少恰好1个词
* # (hash) can substitute for zero or more words. 匹配零个、一个或多个词
* 如:
* audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等
* audit.* 只能匹配 audit.irs
* */
channel.queueBind(QUEUE_INFO, EXCHANGE_NAME,"*.info.#");
channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME,"*.error.#");//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("topic模式消费者1接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(QUEUE_INFO,true, consumer);
channel.basicConsume(QUEUE_ERROR,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
消费者二
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassTopicConsumerTwo{//交换机名称privatefinalstaticString EXCHANGE_NAME ="topic_exchange";//用于接收info路由键所发布消息的队列privatefinalstaticString QUEUE_INFO ="topic_queue_info";//用于接收error路由键所发布消息的队列privatefinalstaticString QUEUE_ERROR ="topic_queue_error";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 topic-模糊匹配路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"topic");// 声明队列
channel.queueDeclare(QUEUE_INFO,true,false,false,null);
channel.queueDeclare(QUEUE_ERROR,true,false,false,null);/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* 这里使用不同的路由键,同时将两个队列绑定到交换机
* *//**
* 使用通配符路由键来绑定交换机与队列:
*
* 通配符
* * (star) can substitute for exactly one word. 匹配不多不少恰好1个词
* # (hash) can substitute for zero or more words. 匹配零个、一个或多个词
* 如:
* audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等
* audit.* 只能匹配 audit.irs
* */
channel.queueBind(QUEUE_INFO, EXCHANGE_NAME,"*.info");
channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME,"*.error");//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("topic模式消费者2接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(QUEUE_INFO,true, consumer);
channel.basicConsume(QUEUE_ERROR,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动两个消费者,再启动生产者,效果为消费者一控制台有输出,消费者二控制台无输出。
生产者发布消息的路由键为
message.info.one
message.error.one
消费者一的路由键为
*.info.#
*.error.#
*
匹配不多不少恰好1个词,
#
匹配零个、一个或多个词,所以消费者一可以匹配成功。而消费者二的路由键:
*.info
*.error
只能匹配类似于message.info与message.error格式的路由键,故无法接收到消息。
消费者一控制台输出:
topic模式消费者1接收消息: topic模式消息-info!
topic模式消费者1接收消息: topic模式消息-error!
下图为管理界面topic模式创建的交换机及队列
Rpc模式
RabbitMQ的RPC(Remote Procedure Call) 模式允许生产者发布消息后,接收消费者的回调信息(类似http的请求与响应),就像调用本地方法一样接收返回值。RabbitMQ 提供了一个简单但强大的机制来实现 RPC 功能。
RPC 模式的核心概念
- 客户端(Client):即生产者,发起 RPC 请求,发送消息到队列并等待服务端返回结果。
- 服务端(Server):即消费者,接收 RPC 请求,对请求进行处理并将结果返回给客户端。
- 队列:客户端(生产者)将请求发送到队列中,服务端监听该队列以接收请求。
- 回调队列(Callback Queue):客户端为接收服务端(消费者)返回的结果而设置的专用队列。
- Correlation ID(相关 ID):用于标识每个 RPC 请求和其对应的响应,使客户端能正确处理返回结果。
RPC 模式的工作流程
- 客户端发送请求:- 创建一个唯一的回调队列。- 生成一个唯一的
Correlation ID
,用于标识请求。- 将消息发送到指定的请求队列,并设置消息的replyTo
属性为回调队列。 - 服务端处理请求:- 从请求队列中获取消息。- 执行处理逻辑并生成结果。- 将结果发送到客户端指定的回调队列,带上原始消息的
Correlation ID
。 - 客户端接收响应:- 监听回调队列。- 检查返回消息的
Correlation ID
是否与请求的Correlation ID
匹配。- 返回结果给调用方。
RPC 模式的优缺点
优点:
- 实现简单:通过 RabbitMQ 提供的基本功能可以实现完整的 RPC 流程。
- 松耦合:客户端和服务端无需直接通信,降低了依赖。
- 支持并发:多个服务端可以监听同一请求队列,实现任务负载均衡。
缺点:
- 性能限制:消息的发送和接收增加了额外的延迟,不适合高实时性要求的场景。
- 资源开销:每个请求需要单独的回调队列,消耗更多的资源。
- 缺乏强一致性保证:消息的丢失或服务端失败可能导致请求无响应。
应用场景
- 任务分发和结果收集:将复杂的任务分发到多个服务处理,收集处理结果。
- 远程调用:在微服务架构中,调用其他服务的接口。
- 计算密集型任务:将大规模计算任务分发到多个节点执行。
生产者
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;importjava.util.UUID;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;publicclassRpcProducer{//rpc队列privatefinalstaticString REQUEST_QUEUE ="rpc_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){//定义临时队列,用于消息回调String replyQueueName = channel.queueDeclare().getQueue();finalString corrId = UUID.randomUUID().toString();// 生成唯一的 Correlation ID// 设置请求属性AMQP.BasicProperties props =newAMQP.BasicProperties
.Builder().correlationId(corrId).replyTo(replyQueueName)// 设置回调队列.build();//发送请求到请求队列
channel.basicPublish("", REQUEST_QUEUE, props,"Rpc模式消息-Hello World".getBytes("UTF-8"));//创建阻塞队列用于接收响应finalBlockingQueue<String> response =newArrayBlockingQueue<>(1);//消费回调队列中的消息String ctag = channel.basicConsume(replyQueueName,true,(consumerTag, delivery)->{if(delivery.getProperties().getCorrelationId().equals(corrId)){
response.offer(newString(delivery.getBody(),"UTF-8"));// 放入响应队列}}, consumerTag ->{});//等待响应并返回String result = response.take();//取消订阅回调队列
channel.basicCancel(ctag);System.out.println("回调消息:"+result);}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassRpcConsumer{//rpc队列privatefinalstaticString REQUEST_QUEUE ="rpc_queue";publicstaticvoidmain(String[] args){//这里不关闭连接,否则接收不到消息并无法回调,测试完成后在IDEA中手动结束主方法即可try{//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(REQUEST_QUEUE,false,false,false,null);// 设置每次只处理一个消息
channel.basicQos(1);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body,"utf-8");System.out.println("rpc模式消费者接收消息: "+ msg +"!");// 发送响应到回调队列AMQP.BasicProperties replyProps =newAMQP.BasicProperties
.Builder().correlationId(properties.getCorrelationId()).build();
channel.basicPublish("", properties.getReplyTo(), replyProps,"Rpc消费者接收成功".getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(),false);}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(REQUEST_QUEUE,false, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动生产者,再启动消费者
消费者控制台输出:
rpc模式消费者接收消息: Rpc模式消息-Hello World!
生产者控制台输出:
回调消息:Rpc消费者接收成功
生产者启动并发布消息后,会等待消费者的回调消息,当消费者成功消费后,生产者接收到回调消息并打印控制台
过期时间设置
过期时间
TTL
表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间
TTL
较小的那个数值为准。消息在队列的生存时间一旦超过设置的
TTL
值,就成为dead message被投递到死信队列, 消费者将无法再收到该消息。
设置队列过期时间
这里以简单模式为例演示
生产者
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;importjava.util.HashMap;importjava.util.Map;publicclassQueueTtlProducer{//队列名称privatefinalstaticString QUEUE_NAME ="ttl_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){/**
* channel.queueDeclare()方法为声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/Map<String,Object> arguments =newHashMap<>();
arguments.put("x-message-ttl",10000);//x-message-ttl为队列的超时属性,这里设置为10秒过期
channel.queueDeclare(QUEUE_NAME,true,false,false, arguments);// 消息内容String message ="Hello World";/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME,null, message.getBytes());}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;importjava.util.HashMap;importjava.util.Map;publicclassQueueTtlConsumer{//要与生产者的队列名保持一致privatefinalstaticString QUEUE_NAME ="ttl_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){// 声明队列Map<String,Object> arguments =newHashMap<>();
arguments.put("x-message-ttl",10000);//x-message-ttl为队列的超时属性,这里设置为10秒过期
channel.queueDeclare(QUEUE_NAME,true,false,false, arguments);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("超时队列接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
进行测试
先启动生产者,查看管理页面,多出了声明的
ttl_queue
队列,拥有超时属性,并且队列内存在一个未消费的消息:
不启动消费者,等待10秒后页面自动刷新,消息已消失:
生产者启动10秒后再启动消费者,其控制台无任何输出信息,如果在10秒内启动消费者,则消费者会收到消息:
超时队列接收消息: Hello World!
设置消息过期时间
消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。
生产者
依旧以简单模式为例,这里使用
BasicProperties
来为消息设置超时时间:
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassMessageTtlProducer{//队列名称privatefinalstaticString QUEUE_NAME ="messageTtl_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用上面创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){//声明队列时不设置超时
channel.queueDeclare(QUEUE_NAME,true,false,false,null);// 消息内容String message ="Hello World";//设置消息的过期时间,此处相当于为每一个消息单独设置属性AMQP.BasicProperties basicProperties =newAMQP.BasicProperties().builder().contentEncoding("UTF-8")// 编码方式.expiration("10000")// 过期时间.build();/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME, basicProperties, message.getBytes());}catch(Exception ex){
ex.printStackTrace();System.out.println("发送消息出现异常...");}}}
消费者
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;publicclassMessageTtlConsumer{//要与生产者的队列名保持一致privatefinalstaticString QUEUE_NAME ="messageTtl_queue";publicstaticvoidmain(String[] args){//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channeltry(//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel()){// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg =newString(body,"utf-8");System.out.println("超时队列接收消息: "+ msg +"!");}};/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动生产者,查看管理页面看到新创建的队列,由于只是为消息设置超时,队列没有了蓝色的TTL标识:
如果在生产者设置的超时时间内启动消费者,则其控制台输出如下,否则消息超时消费者接收不到:
超时队列接收消息: Hello World!
其他属性设置
上面列举了队列与消息的超时属性设置,除此之外还有很多其他可设置项
队列扩展属性
常见队列属性
在 RabbitMQ 中,队列可以通过
queueDeclare
的
arguments
参数来配置扩展属性。以下是常见的队列扩展属性及其含义:
**
x-message-ttl
**
- 含义:设置队列中消息的存活时间(以毫秒为单位)。
- 用法:
arguments.put("x-message-ttl", 10000);
消息超过这个时间未被消费就会过期并被移除队列。
**
x-expires
**
- 含义:设置队列的存活时间(以毫秒为单位)。如果队列在指定时间内未被使用(没有消费者连接、没有消息存储等),队列将被自动删除。
- 用法:
arguments.put("x-expires", 60000);
队列将在 60 秒后自动删除。
**
x-max-length
**
- 含义:限制队列中最大消息数量。如果队列中的消息数达到限制,新发布的消息将被丢弃或替换最早的消息(与
x-overflow
配合使用)。 - 用法:
arguments.put("x-max-length", 1000);
队列最多存储 1000 条消息。
**
x-max-length-bytes
**
- 含义:限制队列中消息总大小(以字节为单位)。如果总大小超过限制,新发布的消息将被丢弃或替换最早的消息。
- 用法:
arguments.put("x-max-length-bytes", 10485760);
队列的消息总大小限制为 10 MB。
**
x-overflow
**
- 含义:设置队列的溢出行为,当队列达到
x-max-length
或x-max-length-bytes
时的处理方式。 - 取值: -
"drop-head"
:丢弃最早的消息(FIFO 式移除)。-"reject-publish"
:拒绝新发布的消息。 - 用法:
arguments.put("x-overflow", "drop-head");
**
x-dead-letter-exchange
**
- 含义:设置队列的死信交换机。队列中的死信消息(如过期、被拒绝、队列满等)将被转发到指定的交换机。
- 用法:
arguments.put("x-dead-letter-exchange", "dead_exchange");
当消息变成死信时,它们将路由到dead_exchange
。
**
x-dead-letter-routing-key
**
- 含义:设置死信消息的路由键(配合
x-dead-letter-exchange
使用)。 - 用法:
arguments.put("x-dead-letter-routing-key", "dead_key");
死信消息将使用dead_key
进行路由。
**
x-max-priority
**
- 含义:设置队列的最大优先级,启用消息优先级队列。
- 用法:
arguments.put("x-max-priority", 10);
队列支持消息优先级,优先级范围为 0 到 10。
**
x-queue-mode
**
- 含义:设置队列模式。
- 取值: -
"default"
:默认模式,所有消息存储在内存和磁盘上。-"lazy"
:惰性模式,尽可能将消息存储到磁盘以减少内存消耗。 - 用法:
arguments.put("x-queue-mode", "lazy");
队列切换到惰性模式。
**
x-queue-master-locator
**
- 含义:在 RabbitMQ 集群环境中,指定队列主副本的位置策略。
- 取值: -
"min-masters"
:选择负载最低的节点作为主副本。-"client-local"
:选择与客户端最近的节点作为主副本。 - 用法:
arguments.put("x-queue-master-locator", "min-masters");
自定义插件属性
- RabbitMQ 支持通过插件扩展的属性,例如延迟消息插件(
rabbitmq-delayed-message-exchange
)会引入新的属性: - **x-delayed-type
**:指定延迟队列的交换机类型(如direct
、fanout
、topic
)。- 用法:arguments.put("x-delayed-type", "direct");
示例代码
以下代码示例展示了如何为队列配置多种属性:
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.util.HashMap;importjava.util.Map;publicclassQueueDeclareExample{privatefinalstaticString QUEUE_NAME ="example_queue";publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){// 配置队列属性Map<String,Object> arguments =newHashMap<>();
arguments.put("x-message-ttl",60000);// 消息过期时间
arguments.put("x-max-length",100);// 最大消息数
arguments.put("x-dead-letter-exchange","dead_exchange");// 死信交换机
arguments.put("x-queue-mode","lazy");// 惰性队列// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false, arguments);System.out.println("Queue declared with custom properties.");}}}
注意事项
- 属性顺序依赖:某些属性依赖其他属性,例如
x-dead-letter-routing-key
需要同时设置x-dead-letter-exchange
。 - 持久化:队列的持久化属性与扩展属性分开设置,队列扩展属性不会影响持久化行为。
- 集群环境:某些属性(如
x-queue-master-locator
)仅在集群环境中有效。
通过合理配置队列属性,可以更好地满足业务需求并提升 RabbitMQ 的性能和可靠性。
消息扩展属性
在 RabbitMQ 中,可以通过
AMQP.BasicProperties
配置发布消息时的多种属性。除了示例代码中设置的消息过期时间 (
expiration
) 外,还可以为消息配置其他重要属性。
常见消息属性
**
contentType
(内容类型)**
- 描述:指定消息的 MIME 类型,例如
text/plain
、application/json
等。 - 示例:
basicProperties =newAMQP.BasicProperties.Builder().contentType("application/json").build();
**
contentEncoding
(内容编码)**
- 描述:指定消息内容的编码方式,例如
UTF-8
、gzip
等。 - 示例:
basicProperties =newAMQP.BasicProperties.Builder().contentEncoding("UTF-8").build();
**
expiration
(过期时间)**
- 描述:设置消息的生存时间(以毫秒为单位)。消息在队列中超过指定时间后会变为死信。
- 示例:
basicProperties =newAMQP.BasicProperties.Builder().expiration("60000")// 60 秒.build();
**
priority
(优先级)**
- 描述:指定消息的优先级,配合队列的
x-max-priority
属性使用。值范围是 0(最低优先级)到队列配置的最大优先级。 - 示例:
basicProperties =newAMQP.BasicProperties.Builder().priority(5).build();
**
correlationId
(关联 ID)**
- 描述:用于将请求和响应进行关联,通常在 RPC 模式中使用。
- 示例:
basicProperties =newAMQP.BasicProperties.Builder().correlationId("12345").build();
**
replyTo
(回调队列名称)**
- 描述:指定响应消息的回调队列,用于 RPC 模式。
- 示例:
basicProperties =newAMQP.BasicProperties.Builder().replyTo("response_queue").build();
**
messageId
(消息 ID)**
- 描述:消息的唯一标识符,用于幂等性校验或追踪。
- 示例:
basicProperties =newAMQP.BasicProperties.Builder().messageId("msg-001").build();
**
timestamp
(时间戳)**
- 描述:消息的创建时间,通常由生产者设置。
- 示例:
basicProperties =newAMQP.BasicProperties.Builder().timestamp(newDate()).build();
**
type
(消息类型)**
- 描述:指定消息的类型,用于消费者区分不同消息的处理逻辑。
- 示例:
basicProperties =newAMQP.BasicProperties.Builder().type("order_created").build();
**
userId
(用户 ID)**
- 描述:用于验证发布消息的用户。RabbitMQ 会检查
userId
是否与连接的用户一致。 - 示例:
basicProperties =newAMQP.BasicProperties.Builder().userId("guest").build();
**
appId
(应用 ID)**
- 描述:标识发布消息的应用程序。
- 示例:
basicProperties =newAMQP.BasicProperties.Builder().appId("my_app").build();
**
headers
(自定义头部信息)**
- 描述:消息的元数据,存储为键值对格式,可以传递扩展信息。
- 示例:
Map<String,Object> headers =newHashMap<>();headers.put("source","web");headers.put("destination","api");basicProperties =newAMQP.BasicProperties.Builder().headers(headers).build();
完整示例代码
以下代码展示了如何为消息设置多种属性并发布到队列:
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;publicclassMessagePropertiesProducer{privatefinalstaticString QUEUE_NAME ="properties_queue";publicstaticvoidmain(String[] args){try(Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel()){// 声明队列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);// 消息内容String message ="Hello RabbitMQ with properties!";// 配置消息属性Map<String,Object> headers =newHashMap<>();
headers.put("format","json");
headers.put("source","application");AMQP.BasicProperties properties =newAMQP.BasicProperties.Builder().contentType("application/json").contentEncoding("UTF-8").expiration("60000").priority(5).correlationId("12345").replyTo("response_queue").messageId("msg-001").timestamp(newDate()).type("custom_message").userId("guest").appId("my_app").headers(headers).build();// 发布消息
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());System.out.println("Message sent with properties!");}catch(Exception e){
e.printStackTrace();}}}
注意事项
- 属性优先级:队列的属性可能会覆盖消息的属性。例如,队列的
x-message-ttl
会优先于消息的expiration
。 - 类型匹配:某些属性需要特定格式,如
priority
必须是整数,headers
是键值对。 - 用户权限:使用
userId
属性时,RabbitMQ 会严格校验用户身份,需确保设置正确。
通过设置这些属性,可以增强消息的功能性和可靠性,满足更多业务需求。
死信队列
概念
死信队列是 RabbitMQ 中的一种特殊队列,用于存储被拒绝或无法被正常处理的消息。消息变成“死信”并被转发到死信队列,方便后续分析和处理。
死信队列是消息处理失败时的补救措施。可以使用死信队列进行日志分析、故障排查或重新投递机制。
消息变成死信的三种情况
- **消息被消费者拒绝且
requeue=false
**:- 消费者显式拒绝消息(使用channel.basicReject
或channel.basicNack
),并指定不重新入队。 - 消息在队列中TTL(Time-To-Live)过期:- 设置消息或队列的 TTL,当消息超时未被消费时,进入死信队列。
- 队列达到最大长度限制:- 队列中消息数量超过最大限制,最早的消息被丢弃,转发到死信队列。
死信队列配置
配置死信队列需要为队列设置以下参数:
- **
x-dead-letter-exchange
**: 指定死信队列对应的交换机。 - **
x-dead-letter-routing-key
**(可选): 指定死信消息在死信交换机上的路由键。
生产者示例
声明一个正常的交换机,用于投递消息,使用direct模式。
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importutil.ConnectionUtil;publicclassDLXProducer{privatestaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticvoidmain(String[] args){try(Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel()){// 声明正常交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");String routingKey ="normal_key";String message ="测试死信消息";// 发送消息
channel.basicPublish(NORMAL_EXCHANGE, routingKey,null, message.getBytes());System.out.println("发送消息: "+ message);}catch(Exception e){
e.printStackTrace();}}}
消费者一
声明正常的交换机与队列,再声明死信交换机与队列,然后进行绑定
私信队列使用步骤:
- 声明正常的交换机与正常队列,通过一个路由键将二者绑定。
- 声明死信交换机与死信队列,通过一个路由键将二者绑定。
- 为正常的队列配置扩展参数:
x-dead-letter-exchange
与x-dead-letter-routing-key
,分别为所声明的死信交换机与死信路由键。
代码展示
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;importjava.util.HashMap;importjava.util.Map;publicclassDLXConsumer{//普通队列privatestaticfinalString NORMAL_QUEUE ="normal_queue";//普通交换机privatestaticfinalString NORMAL_EXCHANGE ="normal_exchange";//普通路由键privatestaticfinalString NORMAL_KEY="normal_key";//死信队列privatestaticfinalString DLX_QUEUE ="dlx_queue";//死信交换机privatestaticfinalString DLX_EXCHANGE ="dlx_exchange";//死信路由键privatestaticfinalString DLX_KEY="dlx_key";publicstaticvoidmain(String[] args){try{//创建连接与通道Connection connection =ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明普通交换机与死信交换机
channel.exchangeDeclare(DLX_EXCHANGE,"direct");
channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");// 声明死信队列
channel.queueDeclare(DLX_QUEUE,true,false,false,null);// 声明普通队列并为其配置死信交换机及死信路由键,该队列的消息变为'死信'后会被投入死信交换机Map<String,Object> arguments =newHashMap<>();
arguments.put("x-dead-letter-exchange",DLX_EXCHANGE);// 指定死信交换机
arguments.put("x-dead-letter-routing-key",DLX_KEY);// 指定死信路由键
channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);//绑定死信交换机与死信队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_KEY);//绑定普通交换机与普通队列
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_KEY);// 消费普通队列消息,并模拟拒绝消息,autoAck设置为false来进行手动消息确认
channel.basicConsume(NORMAL_QUEUE,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String message =newString(body);System.out.println("接收到正常队列消息: "+ message);// 拒绝消息并将其发送到死信队列
channel.basicReject(envelope.getDeliveryTag(),false);}});}catch(Exception e){
e.printStackTrace();}}}
消费者二
监听死信队列,消费其中的消息
importcom.rabbitmq.client.*;importutil.ConnectionUtil;importjava.io.IOException;/**
* 用于消费死信队列中的消息
* */publicclassDQConsumer{//前面声明的死信队列privatestaticfinalString DLX_QUEUE ="dlx_queue";publicstaticvoidmain(String[] args){try{//使用之前创建的连接工具,来获取到连接Connection connection =ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列
channel.queueDeclare(DLX_QUEUE,true,false,false,null);//实现消费方法DefaultConsumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{// body 即消息体String msg =newString(body,"utf-8");System.out.println("接收消息: "+ msg +"!");}};//消费消息
channel.basicConsume(DLX_QUEUE,true, consumer);}catch(Exception ex){
ex.printStackTrace();System.out.println("接收消息出现异常...");}}}
测试
先启动消费者DLXConsumer,然后启动生产者DLXProducer,生产者启动后会将消息进行发布,然后DLXConsumer中会拒绝消息,此时消息被放入声明的死信队列中,下图的
Ready
为1,表示死信队列中存有1条未消费的消息:
然后启动用于消费死信队列的消费者DQConsumer,其控制台输出:
接收消息: 测试死信消息!
然后在管理页面等待5秒自动刷新,结果如下:
版权归原作者 Mr. bigworth 所有, 如有侵权,请联系我们删除。