一、RabbitMQ 概述
1. 什么是 MQ?
MQ( Message queue ),从字面意思上看,本质是个队列,FIFO 先进先出,只不过队列中存放的内容是消息(message),消息可以非常简单,比如只包含文本字符串,JSON 等,也可以很复杂,比如内嵌对象,MQ 多用于分布式系统之间进行通信;
MQ 主要工作是接收并转发消息,在不同的应用场景下可以展现不同的作用:
异步解耦:在业务流程中,一些操作可能非常耗时,但并不需要即时返回结果。借助MQ可以将这些操作异步化。
流量削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,无疑是巨大的浪费。使用MQ能够使关键组件支撑突发访问压力,不会因为突发流量而崩溃。
消息分发:当多个系统需要对同一数据做出响应时,可以使用MQ进行消息分发。例如,支付成功后,支付系统可以向MQ发送消息,其他系统订阅该消息,而无需轮询数据库。
2. RabbitMQ
(官网 RabbitMQ: One broker to queue them all | RabbitMQ)
RabbitMQ 是一个采用 Erlang 语言开发的消息队列系统,以其完备的功能、对多种主流语言的支持、友好的开源界面、良好的性能、以及活跃的社区而闻名。它特别适合中小型公司的应用场景,在数据量和并发量没有超大需求的情况下表现优异。
3. 其他 MQ 产品
Kafka:Kafka 是由 Apache 软件基金会开发的一种分布式流处理平台,其最初的目的是用于日志收集和传输。Kafka 以其高吞吐量和卓越的性能而著称,特别适用于需要处理大量数据流的场景。
RocketMQ:RocketMQ 是一个由阿里巴巴开发并捐赠给 Apache 基金会的分布式消息中间件。它基于 Java 开发,在设计上借鉴了 Kafka 的思想,但也引入了一些自己的改进,但支持的客户端语言不多,且社区活跃度一般。
二、RabbitMQ 安装
1. ubuntu 环境下安装
安装 erlang:RabbitMq 需要 erlang 语⾔的支持,在安装 RabbitMq 之前需要安装 erlang
sudo apt-get update #更新软件包sudo apt-get install erlang#安装erlang
安装完成之后输入 erl 命令查看 erlang 版本安装 rabbitmq
sudo apt-get install rabbitmq-server#安装rabbitmqsystemctl status rabbitmq-server#确认安装结果
安装rabbitmq管理界面
rabbitmq-plugins enable rabbitmq_management
添加管理员用户# rabbitmqctl add_user ${账号} ${密码}rabbitmqctl add_user admin admin#给用户添加权限#rabbitmqctl set_user_tags ${账号} ${⻆⾊名称}rabbitmqctl set_user_tags admin administrator
启动服务并访问
sudo service rabbitmq-server start
通过 IP:port 访问管理界面 公网ip + 15672(默认端口号)输入刚才添加的用户名和密码进行登录,来到管理界面
2. docker 环境下安装
#获取镜像
docker pull rabbitmq:management
#运⾏镜像
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
#查看正在运⾏的容器
docker ps
访问管理界面,可通过guest,guest 登录
添加用户
#查看正在运⾏的容器
docker ps
#进⼊容器内部
docker exec -it 容器ID /bin/bash
#添加⽤⼾admin
rabbitmqctl add_user admin admin
#给⽤⼾授权
rabbitmqctl set_user_tags admin administrator
三、RabbitMQ 核心概念
RabbitMQ 是一个消息中间件,也是一个生产者消费者模型,它负责接收,存储并转发消息;
1. Producer 和 Consumer
Producer:生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ 发送消息
Consumer:消费者,也是 RabbitMQ Server 的客户端,从 RabbitMQ 接收消息
Broker:代理,其实就是 RabbitMQ Server,主要是接收、存储和转发消息
2. Connection 和 Channel
**Connection: **连接是客户端和 RabbitMQ 服务器之间的一个TCP连接,这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息
Channel:通道,信道,Channel 是在 Connection 之上的一个抽象层,在 RabbitMQ 中,一个TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接,消息的发送和接收都是基于 Channel 的,通道的主要作用是将消息的读写操作复用到同一个TCP连接上,这样可以减少建立和关闭连接的开销提高性能
3. Virtual host
Virtual host:虚拟主机,这是一个虚拟概念,它为消息队列提供了一种逻辑上的隔离机制,对于RabbitMQ 而言,一个 BrokerServer 上可以存在多个 Virtual Host,当多个不同的用户使用同一个RabbitMo Server 提供的服务时,可以虚拟划分出多个 vhost,每个用户在自己的 vhost 创建exchange/queue等
类似MySOL的"database",是一个逻辑上的集合,一个MySOL服务器可以有多个database
4. Queue
Queue:队列,是 RabbitMQ 的内部对象,用于存储消息,多个消费者,可以订阅同一个队列
5. Exchange
Exchange: 交换机,message 到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 列中 Exchange 起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息
四、RabbitMQ 工作流程
- Producer 生产了一条消息
- Producer 连接到 RabbitMQBroker,建立一个连接(Connection),开启一个信道(Channel)
- Producer 声明一个交换机(Exchange),路由消息
- Producer 声明一个队列(Queue),存放信息
- Producer 发送消息至 RabbitMQ Broker
- RabbitMQ Broker 接收消息,并存入相应的队列(Queue)中,如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者3
- 完成消息发送后,生产者关闭信道和连接
五、代码示例
1. 创建项目,引入依赖
创建一个Maven 项目,在 pom.xml 中引入 RabbitMQ 的依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
2. 生产者代码
package rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.42.44.62"); // 主机
factory.setPort(5672); // 端口号
factory.setUsername("rht"); // 账号
factory.setPassword("47298"); // 密码
factory.setVirtualHost("test"); // 虚拟主机
Connection connection = factory.newConnection();
// 2. 开启通道
Channel channel = connection.createChannel();
// 3. 声明交换机, 此处使用内置的交换机
// 4. 声明队列
/**
* queueDeclare(String queue 队列名称,
* boolean durable 是否可持久化,
* boolean exclusive 该队列是否被独占,
* boolean autoDelete 该队列没有消费者时 是否自动删除,
* Map<String, Object> arguments 参数) throws IOException
*/
channel.queueDeclare("hello", true, false, false, null);
// 5. 发送消息
/**
* basicPublish(String exchange 交换机名称,
* String routingKey 内置交换机 队列名,
* BasicProperties props 属性配置,
* byte[] body 消息)
*/
String message = "hello rabbitmq";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println("消息发送成功");
// 6. 资源释放
channel.close();
connection.close();
}
}
如果不进行资源释放,在管理界面可以看到 channel 和 connection 的信息
同时也可以看到生产的队列中未被消费的元素
3. 消费者代码
package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.42.44.62"); // 主机
factory.setPort(5672); // 端口号
factory.setUsername("rht"); // 账号
factory.setPassword("47298"); // 密码
factory.setVirtualHost("test"); // 虚拟主机
Connection connection = factory.newConnection();
// 2. 创建 channel
Channel channel = connection.createChannel();
// 3. 声明队列(如果存在, 可以不声明)
channel.queueDeclare("test", true, false, false, null);
// 4. 消费消息
/**
* String basicConsume(String queue 队列名称,
* boolean autoAck 是否自动确认,
* Consumer callback 接收到消息后执行的逻辑) throws IOException
*/
DefaultConsumer consumer = new DefaultConsumer(channel){
// 从队列中收到消息就会执行的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息 " + new String(body));
}
};
channel.basicConsume("test", true, consumer);
// 5. 关闭资源
channel.close();
connection.close();
}
}
六、RabbitMQ 七种工作模式
1. Simple(简单模式)
P:生产者,也就是要发送消息的程序
C:消费者,消息的接收者
Queue:消息队列,生产者向其中投递消息,消费者从其中取出消息
特点:一个生产者P,一个消费者C,消息只能被消费一次,也称为点对点(Point-to-Point)模式
适用场景:消息只能被单个消费者处理
2. Work Queue(工作队列)
一个生产者P,多个消费者 C1,C2;
在多个消息的情况下,Work Queue 会将消息分派给不同的消费者,每个消费者都会接收到不同消息,若 P 向队列中发送 10 条消息,则 C1 消费 + C2 消费 = 10
特点:消息不会重复,分配给不同的消费者
适用场景:集群环境中做异步处理
3. Publish/Subscribe(发布/订阅)
图中 X 表示交换机,作用:生产者将消息发送到 Exchange,由交换机将消息按一定规则路由到一个或多个队列中;
RabbitMQ 交换机有四种类型: fanout,direct,topic,headers,不同类型有着不同的路由策略,AMQP协议里还有另外两种类型:System 和自定义
Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
Direct:定向,把消息交给符合指定 routing key 的队列(Routing模式)
Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topics模式)
Headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers 属性进行匹配,headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在
Exchange 只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失;
RoutingKey:路由键,生产者将消息发给交换器时,指定的一个字符串,用来告诉交换机应该如何处理这个消息.
Binding Key:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候一般会指定一个Binding Key,这样RabbitMO就知道如何正确地将消息路由到队列了
4. Routing(路由模式)
路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由 key 发布订阅模式是无条件的将所有消息分发给所有消费者,路由模式是 Exchange 根据 RoutingKey 的规则将数据筛选后发给对应的消费者队列
适合场景:需要根据特定规则分发消息的场景;比如系统打印日志,日志等级分为error,warning,info,debug,就可以通过这种模式,把不同的日志发送到不同的队列,最终输出到不同的文件
5. Topics(通配符模式)
路由模式的升级版,在 routingKey 的基础上,增加了通配符的功能,使之更加灵活 Topics 和 Routing 的基本原理相同,即:生产者将消息发给交换机,交换机根据 RoutingKey 将消息转发给与 Routing Key 匹配的队列,类似于正则表达式的方式来定义 Routingkey 的模式
不同之处是:routingKey 的匹配方式不同,Routing 模式是相等匹配,topics 模式是通配符匹配
适合场景:需要灵活匹配和过滤消息的场景
6. RPC(RPC通信)
在RPC通信的过程中,没有生产者和消费者,是通过两个队列实现了一个可回调的过程
- 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了一个回调队列,用于接收服务端的响应
- 服务端接收到请求后,处理请求并发送响应消息到 replyTo 指定的回调队列
- 客户端在回调队列上等待响应消息,一旦收到响应,客户端会检查消息的correlationld属性,以确保它是所期望的响应
7. Publisher Confirms(发布确认)
Publisher Confirms 模式是 RabbitMQ 提供的一种确保消息可靠发送到 RabbitMQ 服务器的机制,在这种模式下,生产者可以等待 RabbitMQ 服务器的确认,以确保消息已经被服务器接收并处理
生产者将 Channel 设置为 confirm 模式(通过调用channel.confirmSelect()完成)后,发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便跟踪消息的状态
当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送一个确认(ACK)给生产者(包含消息的唯一ID),表明消息已经送达
通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失的问题
适用场景:对数据安全性要求较高的场景,比如金融交易,订单处理
版权归原作者 Rcnhtin 所有, 如有侵权,请联系我们删除。