1. 常见消息中间件大 PK
说到消息中间件,估计大伙多多少少都能讲出来一些,ActiveMQ、RabbitMQ、RocketMQ、Kafka 等等各种以及 JMS、AMQP 等各种协议,然而这些消息中间件各自都有什么特点,我们在开发中又该选择哪种呢?
1.1 AMQP 简介
Message Queue 的需求由来已久,80 年代最早在金融交易中,高盛等公司采用 Teknekron 公司的产品,当时的 Message Queue 软件叫做:the information bus(TIB)。 TIB 被电信和通讯公司采用,路透社收购了 Teknekron 公司。之后,IBM 开发了 MQSeries,微软开发了 Microsoft Message Queue(MSMQ)。这些商业 MQ 供应商的问题是厂商锁定,价格高昂。2001 年,Java Message Service 试图解决锁定和交互性的问题,但对应用来说反而更加麻烦了。
于是 2004 年,摩根大通和 iMatrix 开始着手 Advanced Message Queuing Protocol (AMQP)开放标准的开发。2006 年,AMQP 规范发布。2007 年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。
目前 RabbitMQ 的最新版本为 3.5.7,基于 AMQP 0-9-1。
在 AMQP 协议中,消息收发涉及到如下一些概念:
- Broker: 接收和分发消息的应用,我们日常所用的 RabbitMQ 就是一个 Message Broker。
- Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 中创建 exchange/queue 等。
- Connection: publisher/consumer 和 broker 之间的 TCP 连接,断开连接的操作只会在 client 端进行,Broker 不会断开连接,除非出现网络故障或 broker 服务出现问题。
- Channel: 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP method 包含了 Channel id 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销,关于 Channel,参考RabbitMQ 管理页面该如何使用
- Exchange: Message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (点对点), topic(发布订阅) 以及 fanout (广播)。
- Queue: 消息最终被送到这里等待 Consumer 取走,一个 Message 可以被同时拷贝到多个 queue 中。
- Binding: Exchange 和 Queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 Exchange 中的查询表中,作为 Message 的分发依据。
1.1.1 AMQP 实现
来看看实现了 AMQP 协议的一些具体的消息中间件产品都有哪些。
Apache Qpid
Apache ActiveMQ
RabbitMQ
可能有小伙伴奇怪咋还有 ActiveMQ?其实 ActiveMQ 不仅支持 JMS,也支持 AMQP
另外还有大家熟知的阿里出品的 RocketMQ,这个是自定义了一套协议,社区也提供了 JMS,但是不太成熟
1.2. 重要产品
1.2.1 ActiveMQ
ActiveMQ 是 Apache 下的一个子项目,使用完全支持 JMS1.1 和 J2EE1.4 规范的 JMS Provider 实现,少量代码就可以高效地实现高级应用场景,并且支持可插拔的传输协议,如:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。
ActiveMQ 支持常用的多种语言客户端如 C++、Java、.Net,、Python、 Php、 Ruby 等。
现在的 ActiveMQ 分为两个版本:
- ActiveMQ Classic
- ActiveMQ Artemis
这里的 ActiveMQ Classic 就是原来的 ActiveMQ,而 ActiveMQ Artemis 是在 RedHat 捐赠的 HornetQ 服务器代码的基础上开发的,两者代码完全不同,后者支持 JMS2.0,使用基于 Netty 的异步 IO,大大提升了性能,更为神奇的是,后者不仅支持 JMS 协议,还支持 AMQP 协议、STOMP 以及 MQTT,可以说后者的玩法相当丰富。
因此大家在使用时,建议直接选择 ActiveMQ Artemis。
1.2.2 RabbitMQ
RabbitMQ 算是 AMQP 体系下最为重要的产品了,它基于 Erlang 语言开发实现,估计很多人被 RabbitMQ 的安装折磨过,建议安装 RabbitMQ 直接用 Docker,省心省力(公号后台回复 docker 有教程)。
RabbitMQ 支持 AMQP、XMPP、SMTP、STOMP 等多种协议,功能强大,适用于企业级开发。
来看一张 RabbitMQ 的结构图:
1.2.3 RocketMQ
RocketMQ 是阿里开源的一款分布式消息中间件,原名 Metaq,从 3.0 版本开始改名为 RocketMQ,是阿里参照 Kafka 设计思想使用 Java 语言实现的一套 MQ。RocketMQ 将阿里内部多款 MQ 产品(Notify、Metaq)进行整合,只维护核心功能,去除了所有其他运行时依赖,保证核心功能最简化,在此基础上配合阿里上述其他开源产品实现不同场景下 MQ 的架构,目前主要用于订单交易系统。
RocketMQ 具有以下特点:
- 保证严格的消息顺序。
- 提供针对消息的过滤功能。
- 提供丰富的消息拉取模式。
- 高效的订阅者水平扩展能力。
- 实时的消息订阅机制。
- 亿级消息堆积能力
对于 Java 工程师而言,这也是一种经常会用到的 MQ。
1.2.4 Kafka
Kafka 是 Apache 下的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作(网页浏览,搜索和其他用户的行动)流数据。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
Kafka 具有以下特性:
- 快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化。
- 高吞吐:在一台普通的服务器上既可以达到 10W/s 的吞吐速率。
- 高堆积:支持 topic 下消费者较长时间离线,消息堆积量大。 完全的分布式系统:Broker、Producer、Consumer 都原生自动支持分布式,通过 Zookeeper 可以自动实现更加复杂的负载均衡。
- 支持 Hadoop 数据并行加载。
大数据开发中大家可能会经常接触 Kafka,Java 开发中也会接触,但是相对来说可能接触的少一些。
1.2.5 ZeroMQ
ZeroMQ 号称最快的消息队列系统,它专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZeroMQ 不是单独的服务,而是一个嵌入式库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的 API,应用程序通过加载库文件,调用 API 函数来实现高性能网络通信。
ZeroMQ 的特性:
- 无锁的队列模型:对于跨线程间的交互(用户端和 session)之间的数据交换通道 pipe,采用无锁的队列算法 CAS,在 pipe 的两端注册有异步事件,在读或者写消息到 pipe 时,会自动触发读写事件。
- 批量处理的算法:对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
- 多核下的线程绑定,无须 CPU 切换:区别于传统的多线程并发模式,信号量或者临界区,ZeroMQ 充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的 CPU 切换开销。
1.2.6 其他
另外还有如 Redis 也能做消息队列, Redis可以 做普通消息队列和延迟消息队列,这里也就不啰嗦了。
1.3. 比较
最后,我们再来通过一张图来比较下各个消息中间件。
2. RabbitMQ 管理页面
RabbitMQ 的 web 管理页面相信很多小伙伴都用过
2.1 概览
首先,这个 Web 管理页面大概就像下图这样:
首先一共有六个选项卡:
- Overview:这里可以概览 RabbitMQ 的整体情况,如果是集群,也可以查看集群中各个节点的情况。包括 RabbitMQ 的端口映射信息等,都可以在这个选项卡中查看。
- Connections:这个选项卡中是连接上 RabbitMQ 的生产者和消费者的情况。
- Channels:这里展示的是“通道”信息,关于“通道”和“连接”的关系,在后文再和大家详细介绍。
- Exchange:这里展示所有的交换机信息。
- Queue:这里展示所有的队列信息。
- Admin:这里展示所有的用户信息。
右上角是页面刷新的时间,默认是 5 秒刷新一次,展示的是所有的 Virtual host。
这是整个管理页面的一个大致情况,接下来我们来逐个介绍。
2.2 Overview
Overview 中分了如下一些功能模块:
分别是:
Totals:
Totals 里面有 准备消费的消息数、待确认的消息数、消息总数以及消息的各种处理速率(发送速率、确认速率、写入硬盘速率等等)。
Nodes:
Nodes 其实就是支撑 RabbitMQ 运行的一些机器,相当于集群的节点。
点击每个节点,可以查看节点的详细信息。
Churn statistics:
这个不好翻译,里边展示的是 Connection、Channel 以及 Queue 的创建/关闭速率。
Ports and contexts:
这个里边展示了端口的映射信息以及 Web 的上下文信息。
- 5672 是 RabbitMQ 通信端口。
- 15672 是 Web 管理页面端口。
- 25672 是集群通信端口。
Export definitions && Import definitions:
最后面这两个可以导入导出当前实例的一些配置信息:
2.3 Connections
这里主要展示的是当前连接上 RabbitMQ 的信息,无论是消息生产者还是消息消费者,只要连接上来了这里都会显示出来。
注意协议中的 AMQP 0-9-1 指的是 AMQP 协议的版本号。
其他属性含义如下:
- User name:当前连接使用的用户名。
- State:当前连接的状态,running 表示运行中;idle 表示空闲。
- SSL/TLS:表示是否使用 ssl 进行连接。
- Channels:当前连接创建的通道总数。
- From client:每秒发出的数据包。
- To client:每秒收到的数据包。
点击连接名称可以查看每一个连接的详情。
在详情中可以查看每一个连接的通道数以及其他详细信息,也可以强制关闭一个连接。
2.4 Channels
这个地方展示的是通道的信息:
那么什么是通道呢?
一个连接(IP)可以有多个通道,如上图,一共是两个连接,但是一共有 12 个通道。
一个连接可以有多个通道,这个多个通道通过多线程实现,一般情况下,我们在通道中创建队列、交换机等。
生产者的通道一般会立马关闭;消费者是一直监听的,通道几乎是会一直存在。
上面各项参数含义分别如下:
- Channel:通道名称。
- User name:该通道登录使用的用户名。
- Model:通道确认模式,C 表示 confirm;T 表示事务。
- State:通道当前的状态,running 表示运行中;idle 表示空闲。
- Unconfirmed:待确认的消息总数。
- Prefetch:Prefetch 表示每个消费者最大的能承受的未确认消息数目,简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了。总的来说,消费者负责不断处理消息,不断 ack,然后只要 unAcked 数少于 prefetch * consumer 数目,RabbitMQ 就不断将消息投递过去。
- Unacker:待 ack 的消息总数。
- publish:消息生产者发送消息的速率。
- confirm:消息生产者确认消息的速率。
- unroutable (drop):表示未被接收,且已经删除了的消息。
- deliver/get:消息消费者获取消息的速率。
- ack:消息消费者 ack 消息的速率。
2.5 Exchange
这个地方展示交换机信息:
这里会展示交换机的各种信息。
Type 表示交换机的类型。
Features 有两个取值 D 和 I。
D 表示交换机持久化,将交换机的属性在服务器内部保存,当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。
I 表示这个交换机不可以被消息生产者用来推送消息,仅用来进行交换机和交换机之间的绑定。
Message rate in 表示消息进入的速率。
Message rate out 表示消息出去的速率。
点击下方的 Add a new exchange 可以创建一个新的交换机。
2.6 Queue
这个选项卡就是用来展示消息队列的:
各项含义如下:
- Name:表示消息队列名称。
- Type:表示消息队列的类型,除了上图的 classic,另外还有一种消息类型是 Quorum。两个区别如下图:
- Features:表示消息队列的特性,D 表示消息队列持久化。
- State:表示当前队列的状态,running 表示运行中;idle 表示空闲。
- Ready:表示待消费的消息总数。
- Unacked:表示待应答的消息总数。
- Total:表示消息总数 Ready+Unacked。
- incoming:表示消息进入的速率。
- deliver/get:表示获取消息的速率。
- ack:表示消息应答的速率。
点击下方的 Add a new queue 可以添加一个新的消息队列。
点击每一个消息队列的名称,可以进入到消息队列中。进入到消息队列后,可以完成对消息队列的进一步操作,例如:
- 将消息队列和某一个交换机进行绑定。
- 发送消息。
- 获取一条消息。
- 移动一条消息(需要插件的支持)。
- 删除消息队列。
- 清空消息队列中的消息。 …
如下图:
2.7 Admin
这里是做一些用户管理操作,如下图:
各项属性含义如下:
- Name:表示用户名称。
- Tags:表示角色标签,只能选取一个。
- Can access virtual hosts:表示允许进入的虚拟主机。
- Has password:表示这个用户是否设置了密码。
常见的两个操作时管理用户和虚拟主机。
点击下方的 Add a user 可以添加一个新的用户,添加用户的时候需要给用户设置 Tags,其实就是用户角色,如下:
- none: 不能访问 management plugin
- management: 用户可以通过 AMQP 做的任何事 列出自己可以通过 AMQP 登入的 virtual hosts 查看自己的 virtual hosts 中的 queues, exchanges 和 bindings 查看和关闭自己的 channels 和 connections 查看有关自己的 virtual hosts 的“全局”的统计信息,包含其他用户在这些 virtual hosts 中的活动
- policymaker: management 可以做的任何事 查看、创建和删除自己的 virtual hosts 所属的 policies 和 parameters
- monitoring: management 可以做的任何事 列出所有 virtual hosts,包括他们不能登录的 virtual hosts 查看其他用户的 connections 和 channels 查看节点级别的数据如 clustering 和 memory 使用情况 查看真正的关于所有 virtual hosts 的全局的统计信息
- administrator: policymaker 和 monitoring 可以做的任何事 创建和删除 virtual hosts 查看、创建和删除 users 查看创建和删除 permissions 关闭其他用户的 connections
- impersonator(模拟者) 模拟者,无法登录管理控制台。
另外,这里也可以进行虚拟主机 virtual host 的操作,后面小节会和大家介绍虚拟主机。
3. RabbitMQ 七种消息收发方式
本小节来和小伙伴们分享一下 RabbitMQ 的七种消息传递形式。一起来看看。
大部分情况下,我们可能都是在 Spring Boot 或者 Spring Cloud 环境下使用 RabbitMQ,因此本文我也主要从这两个方面来和大家分享 RabbitMQ 的用法。
3.1 RabbitMQ 架构简介
这张图中涉及到如下一些概念:
- 生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。
- 交换机(Exchange):和生产者建立连接并接收生产者的消息。
- 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。
- 队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。
- 路由(Routes):交换机转发消息到队列的规则。
3.2 准备工作
大家知道,RabbitMQ 是 AMQP 阵营里的产品,Spring Boot 为 AMQP 提供了自动化配置依赖 spring-boot-starter-amqp,因此首先创建 Spring Boot 项目并添加该依赖,如下:
项目创建成功后,在 application.properties 中配置 RabbitMQ 的基本连接信息,如下:
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
接下来进行 RabbitMQ 配置,在 RabbitMQ 中,所有的消息生产者提交的消息都会交由 Exchange 进行再分配,Exchange 会根据不同的策略将消息分发到不同的 Queue 中。
RabbitMQ 官网介绍了如下几种消息分发的形式:
这里我主要和大家介绍前六种消息收发方式。
3.3 消息收发
3.3.1 Hello World
咦?这个咋没有交换机?这个其实是默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:
来看看代码实现:
先来看看队列的定义:
@ConfigurationpublicclassHelloWorldConfig{publicstaticfinalStringHELLO_WORLD_QUEUE_NAME="hello_world_queue";@BeanQueuequeue1(){returnnewQueue(HELLO_WORLD_QUEUE_NAME);}}
再来看看消息消费者的定义:
@ComponentpublicclassHelloWorldConsumer{@RabbitListener(queues =HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)publicvoidreceive(String msg){System.out.println("msg = "+ msg);}}
消息发送:
@SpringBootTestclassRabbitmqdemoApplicationTests{@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,"hello");}}
这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。
3.3.2 Work queues
这种情况是这样的:
一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者,如下图:
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
先来看并发能力的配置,如下:
@ComponentpublicclassHelloWorldConsumer{@RabbitListener(queues =HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)publicvoidreceive(String msg){System.out.println("receive = "+ msg);}@RabbitListener(queues =HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency ="10")publicvoidreceive2(String msg){System.out.println("receive2 = "+ msg+"------->"+Thread.currentThread().getName());}}
可以看到,第二个消费者我配置了 concurrency 为 10,此时,对于第二个消费者,将会同时存在 10 个子线程去消费消息。
启动项目,在 RabbitMQ 后台也可以看到一共有 11 个消费者。
此时,如果生产者发送 10 条消息,就会一下都被消费掉。
消息发送方式如下:
@SpringBootTestclassRabbitmqdemoApplicationTests{@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){for(int i =0; i <10; i++){
rabbitTemplate.convertAndSend(HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,"hello");}}}
消息消费日志如下:
可以看到,消息都被第一个消费者消费了。但是小伙伴们需要注意,事情并不总是这样(多试几次就可以看到差异),消息也有可能被第一个消费者消费(只是由于第二个消费者有十个线程一起开动,所以第二个消费者消费的消息占比更大)。
当然消息消费者也可以开启手动 ack,这样可以自行决定是否消费 RabbitMQ 发来的消息,配置手动 ack 的方式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费代码如下:
@ComponentpublicclassHelloWorldConsumer{@RabbitListener(queues =HelloWorldConfig.HELLO_WORLD_QUEUE_NAME)publicvoidreceive(Message message,Channel channel)throwsIOException{System.out.println("receive="+message.getPayload());
channel.basicAck(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);}@RabbitListener(queues =HelloWorldConfig.HELLO_WORLD_QUEUE_NAME, concurrency ="10")publicvoidreceive2(Message message,Channel channel)throwsIOException{System.out.println("receive2 = "+ message.getPayload()+"------->"+Thread.currentThread().getName());
channel.basicReject(((Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG)),true);}}
此时第二个消费者拒绝了所有消息,第一个消费者消费了所有消息。
这就是 Work queues 这种情况。
3.3.3 Publish/Subscribe
再来看发布订阅模式,这种情况是这样:
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力,如下图:
这种情况下,我们有四种交换机可供选择,分别是:
- Direct
- Fanout
- Topic
- Header
我分别来给大家举一个简单例子看下。
3.3.3.1 Direct
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。DirectExchange 的配置如下:
@ConfigurationpublicclassRabbitDirectConfig{publicfinalstaticStringDIRECTNAME="javaboy-direct";@BeanQueuequeue(){returnnewQueue("hello-queue");}@BeanDirectExchangedirectExchange(){returnnewDirectExchange(DIRECTNAME,true,false);}@BeanBindingbinding(){returnBindingBuilder.bind(queue()).to(directExchange()).with("direct");}}
- 首先提供一个消息队列Queue,然后创建一个DirectExchange对象,三个参数分别是名字,重启后是否依然有效以及长期未用时是否删除。
- 创建一个Binding对象将Exchange和Queue绑定在一起。
- DirectExchange和Binding两个Bean的配置可以省略掉,即如果使用DirectExchange,可以只配置一个Queue的实例即可。
再来看看消费者:
@ComponentpublicclassDirectReceiver{@RabbitListener(queues ="hello-queue")publicvoidhandler1(String msg){System.out.println("DirectReceiver:"+ msg);}}
通过 @RabbitListener 注解指定一个方法是一个消息消费方法,方法参数就是所接收到的消息。然后在单元测试类中注入一个 RabbitTemplate 对象来进行消息发送,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublicclassRabbitmqApplicationTests{@AutowiredRabbitTemplate rabbitTemplate;@TestpublicvoiddirectTest(){
rabbitTemplate.convertAndSend("hello-queue","hello direct!");}}
最终执行结果如下:
3.3.3.2 Fanout
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:
@ConfigurationpublicclassRabbitFanoutConfig{publicfinalstaticStringFANOUTNAME="sang-fanout";@BeanFanoutExchangefanoutExchange(){returnnewFanoutExchange(FANOUTNAME,true,false);}@BeanQueuequeueOne(){returnnewQueue("queue-one");}@BeanQueuequeueTwo(){returnnewQueue("queue-two");}@BeanBindingbindingOne(){returnBindingBuilder.bind(queueOne()).to(fanoutExchange());}@BeanBindingbindingTwo(){returnBindingBuilder.bind(queueTwo()).to(fanoutExchange());}}
在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创建两个消费者,如下:
@ComponentpublicclassFanoutReceiver{@RabbitListener(queues ="queue-one")publicvoidhandler1(String message){System.out.println("FanoutReceiver:handler1:"+ message);}@RabbitListener(queues ="queue-two")publicvoidhandler2(String message){System.out.println("FanoutReceiver:handler2:"+ message);}}
两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublicclassRabbitmqApplicationTests{@AutowiredRabbitTemplate rabbitTemplate;@TestpublicvoidfanoutTest(){
rabbitTemplate
.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout!");}}
注意这里发送消息时不需要 routingkey,指定 exchange 即可,routingkey 可以直接传一个 null。
最终执行日志如下:
3.3.3.3 Topic
TopicExchange 是比较复杂但是也比较灵活的一种路由策略,在 TopicExchange 中,Queue 通过 routingkey 绑定到 TopicExchange 上,当消息到达 TopicExchange 后,TopicExchange 根据消息的 routingkey 将消息路由到一个或者多个 Queue 上。TopicExchange 配置如下:
@ConfigurationpublicclassRabbitTopicConfig{publicfinalstaticStringTOPICNAME="sang-topic";@BeanTopicExchangetopicExchange(){returnnewTopicExchange(TOPICNAME,true,false);}@BeanQueuexiaomi(){returnnewQueue("xiaomi");}@BeanQueuehuawei(){returnnewQueue("huawei");}@BeanQueuephone(){returnnewQueue("phone");}@BeanBindingxiaomiBinding(){returnBindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#");}@BeanBindinghuaweiBinding(){returnBindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#");}@BeanBindingphoneBinding(){returnBindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#");}}
- 首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,第二个 Queue 用来存储和 “huawei” 有关的消息,第三个 Queue 用来存储和 “phone” 有关的消息。
- 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。
接下来针对三个 Queue 创建三个消费者,如下:
@ComponentpublicclassTopicReceiver{@RabbitListener(queues ="phone")publicvoidhandler1(String message){System.out.println("PhoneReceiver:"+ message);}@RabbitListener(queues ="xiaomi")publicvoidhandler2(String message){System.out.println("XiaoMiReceiver:"+message);}@RabbitListener(queues ="huawei")publicvoidhandler3(String message){System.out.println("HuaWeiReceiver:"+message);}}
然后在单元测试中进行消息的发送,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublicclassRabbitmqApplicationTests{@AutowiredRabbitTemplate rabbitTemplate;@TestpublicvoidtopicTest(){
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.news","华为新闻..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.phone","小米手机..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机..");
rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"phone.news","手机新闻..");}}
根据 RabbitTopicConfig 中的配置,第一条消息将被路由到名称为 “xiaomi” 的 Queue 上,第二条消息将被路由到名为 “huawei” 的 Queue 上,第三条消息将被路由到名为 “xiaomi” 以及名为 “phone” 的 Queue 上,第四条消息将被路由到名为 “huawei” 以及名为 “phone” 的 Queue 上,最后一条消息则将被路由到名为 “phone” 的 Queue 上。
3.3.3.4 Header
HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:
@ConfigurationpublicclassRabbitHeaderConfig{publicfinalstaticStringHEADERNAME="javaboy-header";@BeanHeadersExchangeheadersExchange(){returnnewHeadersExchange(HEADERNAME,true,false);}@BeanQueuequeueName(){returnnewQueue("name-queue");}@BeanQueuequeueAge(){returnnewQueue("age-queue");}@BeanBindingbindingName(){Map<String,Object> map =newHashMap<>();
map.put("name","sang");returnBindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match();}@BeanBindingbindingAge(){returnBindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists();}}
这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上。
接下来创建两个消息消费者:
@ComponentpublicclassHeaderReceiver{@RabbitListener(queues ="name-queue")publicvoidhandler1(byte[] msg){System.out.println("HeaderReceiver:name:"+newString(msg,0, msg.length));}@RabbitListener(queues ="age-queue")publicvoidhandler2(byte[] msg){System.out.println("HeaderReceiver:age:"+newString(msg,0, msg.length));}}
注意这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也和 routingkey 无关,如下:
@RunWith(SpringRunner.class)@SpringBootTestpublicclassRabbitmqApplicationTests{@AutowiredRabbitTemplate rabbitTemplate;@TestpublicvoidheaderTest(){Message nameMsg =MessageBuilder.withBody("hello header! name-queue".getBytes()).setHeader("name","sang").build();Message ageMsg =MessageBuilder.withBody("hello header! age-queue".getBytes()).setHeader("age","99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME,null, nameMsg);}}
这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去。
最终执行效果如下:
3.3.4 Routing
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者在创建 Exchange 后,根据 RoutingKey 去绑定相应的队列,并且在发送消息时,指定消息的具体 RoutingKey 即可。
如下图:
这个就是按照 routing key 去路由消息,我这里就不再举例子了,大家可以参考 3.3.1 小结。
3.3.5 Topics
这种情况是这样:
一个生产者,一个交换机,两个队列,两个消费者,生产者创建 Topic 的 Exchange 并且绑定到队列中,这次绑定可以通过 * 和 # 关键字,对指定 RoutingKey 内容,编写时注意格式 xxx.xxx.xxx 去编写。
如下图:
这个我也就不举例啦,前面 3.3.3 小节已经举过例子了,不再赘述。
3.3.6 RPC
RPC 这种消息收发形式,参考其他文章,传送门:
- SpringBoot+RabbitMQ 实现 RPC 调用
3.3.7 Publisher Confirms
传送门:
- 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?
- RabbitMQ 高可用之如何确保消息成功消费
4. RabbitMQ 消息有效期
RabbitMQ 中的消息长期未被消费会过期吗?用过 RabbitMQ 的小伙伴可能都有这样的疑问
4.1 默认情况
首先我们来看看默认情况。
默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
这种情况具体代码就不用我再演示了吧
4.2 TTL
TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信
TTL 的设置有两种不同的方式:
- 在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。
- 在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。
那如果两个都设置了呢?
以时间短的为准。
当我们设置了消息有效期后,消息过期了就会被从队列中删除了(进入到死信队列,后文一样,不再标注),但是两种方式对应的删除时机有一些差异:
- 对于第一种方式,当消息队列设置过期时间的时候,那么消息过期了就会被删除,因为消息进入 RabbitMQ 后是存在一个消息队列中,队列的头部是最早要过期的消息,所以 RabbitMQ 只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除。
- 对于第二种方式,当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除。
介绍完 TTL 之后,接下来我们来看看具体用法。
接下来所有代码都以 Spring Boot 中封装的 AMPQ 为例来讲解。
4.2.1 单条消息过期
我们先来看单条消息的过期时间。
首先创建一个 Spring Boot 项目,引入 Web 和 RabbitMQ 依赖,如下:
然后在 application.properties 中配置一下 RabbitMQ 的连接信息,如下:
然后在 application.properties 中配置一下 RabbitMQ 的连接信息,如下:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
接下来稍微配置一下消息队列:
@ConfigurationpublicclassQueueConfig{publicstaticfinalStringJAVABOY_QUEUE_DEMO="javaboy_queue_demo";publicstaticfinalStringJAVABOY_EXCHANGE_DEMO="javaboy_exchange_demo";publicstaticfinalStringHELLO_ROUTING_KEY="hello_routing_key";@BeanQueuequeue(){returnnewQueue(JAVABOY_QUEUE_DEMO,true,false,false);}@BeanDirectExchangedirectExchange(){returnnewDirectExchange(JAVABOY_EXCHANGE_DEMO,true,false);}@BeanBindingbinding(){returnBindingBuilder.bind(queue()).to(directExchange()).with(HELLO_ROUTING_KEY);}}
这个配置类主要干了三件事:配置消息队列、配置交换机以及将两者绑定在一起。
- 首先配置一个消息队列,new 一个 Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列。
- 配置一个 DirectExchange 交换机。
- 将交换机和队列绑定到一起。
这段配置应该很简单,没啥好解释的,有一个排他性,这里稍微多说两句:
关于排他性,如果设置为 true,则该消息队列只有创建它的 Connection 才能访问,
其他的 Connection 都不能访问该消息队列,如果试图在不同的连接中重新声明或者访问排他性队列,
那么系统会报一个资源被锁定的错误。另一方面,对于排他性队列而言,当连接断掉的时候,
该消息队列也会自动删除(无论该队列是否被声明为持久性队列都会被删除)。
接下来提供一个消息发送接口,如下:
@RestControllerpublicclassHelloController{@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/hello")publicvoidhello(){Message message =MessageBuilder.withBody("hello javaboy".getBytes()).setExpiration("10000").build();
rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);}}
在创建 Message 对象的时候我们可以设置消息的过期时间,这里设置消息的过期时间为 10 秒。
这就可以啦!
接下来我们启动项目,进行消息发送测试。当消息发送成功之后,由于没有消费者,所以这条消息并不会被消费。打开 RabbitMQ 管理页面,点击到 Queues 选项卡,10s 之后,我们会发现消息已经不见了:
很简单吧!
单条消息设置过期时间,就是在消息发送的时候设置一下消息有效期即可。
4.2.2 队列消息过期
给队列设置消息过期时间,方式如下:
@BeanQueuequeue(){Map<String,Object> args =newHashMap<>();
args.put("x-message-ttl",10000);returnnewQueue(JAVABOY_QUEUE_DEMO,true,false,false, args);}
设置完成后,我们修改消息的发送逻辑,如下:
@RestControllerpublicclassHelloController{@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/hello")publicvoidhello(){Message message =MessageBuilder.withBody("hello javaboy".getBytes()).build();
rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);}}
可以看到,消息正常发送即可,不用设置消息过期时间。
OK,启动项目,发送一条消息进行测试。查看 RabbitMQ 管理页面,如下:
可以看到,消息队列的 Features 属性为 D 和 TTL,D 表示消息队列中消息持久化,TTL 则表示消息会过期。
10s 之后刷新页面,发现消息数量已经恢复为 0。
这就是给消息队列设置消息过期时间,一旦设置了,所有进入到该队列的消息都有一个过期时间了。
4.2.3 特殊情况
还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)。
4.3 死信队列
有小伙伴不禁要问,被删除的消息去哪了?真的被删除了吗?非也非也!这就涉及到死信队列了,接下来我们来看看死信队列。
4.3.1 死信交换机
死信交换机,Dead-Letter-Exchange 即 DLX。
死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
- 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
- 消息过期
- 队列达到最大长度
当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。
DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
4.3.2 死信队列
这个好理解,绑定了死信交换机的队列就是死信队列。
4.3.3 实践
我们来看一个简单的例子。
首先我们来创建一个死信交换机,接着创建一个死信队列,再将死信交换机和死信队列绑定到一起:
publicstaticfinalStringDLX_EXCHANGE_NAME="dlx_exchange_name";publicstaticfinalStringDLX_QUEUE_NAME="dlx_queue_name";publicstaticfinalStringDLX_ROUTING_KEY="dlx_routing_key";/**
* 配置死信交换机
*
* @return
*/@BeanDirectExchangedlxDirectExchange(){returnnewDirectExchange(DLX_EXCHANGE_NAME,true,false);}/**
* 配置死信队列
* @return
*/@BeanQueuedlxQueue(){returnnewQueue(DLX_QUEUE_NAME);}/**
* 绑定死信队列和死信交换机
* @return
*/@BeanBindingdlxBinding(){returnBindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with(DLX_ROUTING_KEY);}
这其实跟普通的交换机,普通的消息队列没啥两样。
接下来为消息队列配置死信交换机,如下:
@BeanQueuequeue(){Map<String,Object> args =newHashMap<>();//设置消息过期时间
args.put("x-message-ttl",0);//设置死信交换机
args.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);//设置死信 routing_key
args.put("x-dead-letter-routing-key",DLX_ROUTING_KEY);returnnewQueue(JAVABOY_QUEUE_DEMO,true,false,false, args);}
就两个参数:
- x-dead-letter-exchange:配置死信交换机。
- x-dead-letter-routing-key:配置死信 routing_key。
这就配置好了。
将来发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。
死信消息队列的消费和普通消息队列的消费并无二致:
@RabbitListener(queues =QueueConfig.DLX_QUEUE_NAME)publicvoiddlxHandle(String msg){System.out.println("dlx msg = "+ msg);}
版权归原作者 SuperW. 所有, 如有侵权,请联系我们删除。