一.什么是RabbitMQ?
RabbitMQ 是一种流行的消息队列(Message Queue)实现,基于 AMQP 协议(Advanced Message Queuing Protocol)。它支持异步通信,使多个系统之间以非阻塞的方式交换数据。
在我们使用微服务的时候,微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于 OpenFeign 的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通****讯。
如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。
二.异步调用处理逻辑:
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用方
- 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。这样,发送消息的人和接收消息的人就完全解耦了。
异步调用的优势包括:
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
当然,异步通信也并非完美无缺,它存在下列缺点:
- 完全依赖于Broker的可靠性、安全性和性能
- 架构复杂,后期维护和调试麻烦
三.RabbitMQ的基本使用:
下面是RabbitMQ的官网:https://www.rabbitmq.com/
1.安装:
首先将RabbitMQ的镜像拉取下来,然后运行下面命令:
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hm-net\
-d \
rabbitmq:3.8-management
随后我们访问http://虚拟机IP地址:15672来打开RabbitMQ的控制台。
在控制台上主要可以关注三个信息:Exchanges(交换机),Queues(队列),Admin(用户管理)。
2.架构图:
其中包含几个概念:
- **
publisher
**:生产者,也就是发送消息的一方- **
consumer
**:消费者,也就是消费消息的一方- **
queue
**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理- **
exchange
**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。- **
virtual host
**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
3.RabbitMQ控制台的使用:
在 RabbitMQ 中,交换机(Exchange) 和 队列(Queue) 是核心概念。它们之间的关系决定了消息的路由和存储方式。
(1)Exchanges 交换机:
- 交换机是 消息的路由器,负责决定消息应该被发送到哪个队列。
- 生产者将消息发送给交换机,而不是直接发送到队列。
- 交换机根据路由规则 决定消息的走向(即发往哪些队列)。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
RabbitMQ 提供了四种常用的交换机类型,每种类型的路由规则不同:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
我们可以再这里创建交换机,Name表示创建的交换机的名字,Type表示可以选择交换机的四种类型。创建成功后就可以在上面看到创建的交换机名字:
比如我们点击amq.fanout查看交换机数据并且可以发送消息给消费者。
注意!!!如果我们不将交换机指定队列的话,由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力。
所以下面我们要先创建队列,然后让生产者推送的消息经过交换机的传递后,到达消息队列,然后再给消费者。所以生产者无需知道队列的存在以此来达到解耦的效果。
(2)Queues 队列:
- 队列用于 存储消息,直到消费者消费它们。
- 队列与消费者一一对应,即一个消费者从一个队列读取消息。
- 队列按 FIFO(First In, First Out)的顺序存储消息。
在这里我们填写队列名字即可,其他暂时可以不用填写。
随后我们向交换机进行绑定(bind)队列,随后通过队列传输给消费者。
这里的Routing key的出现是为了让 Direct (交换机的类型)能够选择队列而存在的。
我们在绑定队列完成后会出现下面这样,这样证明我们成功为交换机绑定好两个队列:
随后我们在下面窗口推送消息:
(3)Admin :
①Users(用户管理):
管理 RabbitMQ 中的用户账号,在这里 添加、删除用户,并设置每个用户的权限。
每个用户可分配不同的 角色:
- administrator:管理员,具有所有权限。
- monitoring:可以监控和查看信息,但不能管理。
- policymaker:可以设置策略和参数。
- management:可以访问管理界面但没有策略权限。
Name
:itheima
,也就是用户名Tags
:administrator
,说明itheima
用户是超级管理员,拥有所有权限Can access virtual host
:/
,可以访问的virtual host
,这里的/
是默认的virtual host
②Virtual Hosts(虚拟主机):
将 RabbitMQ 服务器划分为多个 虚拟主机(vhost),类似于一个独立的命名空间。
- 不同的应用可以使用不同的虚拟主机,彼此隔离。
- 每个虚拟主机都有自己的 交换机、队列和用户权限。
四.SpringAMOP的使用:
Spring AMQP(Spring for Advanced Message Queuing Protocol)是 Spring 提供的一个消息队列集成模块,主要用于简化与 RabbitMQ 的集成。它通过 AMQP 协议来实现消息的生产和消费。
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
1.导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.添加配置:
在
publisher以及consumer
服务的
application.yml
中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
listener:
simple:
prefetch: 1 # (能者多劳)每次只能获取一条消息,处理完成才能获取下一个消息
3.在publisher服务中利用RabbitTemplate实现消息发送:
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
4.定义消费者实现异步调用:
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(name = "pay.topic"),
key = "pay.success"
))
public void listenPaySuccess(Long orderId){
//调用方法
orderService.markOrderPaySuccess(orderId);
}
}
①@RequiredArgsConstructor:
这是 Lombok 提供的注解,自动为类中所有
final
修饰的字段生成一个包含这些字段的构造函数。使用这个注解可以免去手动编写构造函数的麻烦,尤其是在使用依赖注入时(例如注入
IOrderService
)。
②@RabbitListener:
@RabbitListener
注解用于监听来自 RabbitMQ 队列的消息。它会自动监听指定的队列,当有消息到达时,会触发
listenPaySuccess
方法进行处理。
③QueueBinding(队列绑定):
通过
@QueueBinding
注解,绑定了 队列 和 交换机,并指定了 路由键。
- @Queue-
name = "trade.pay.success.queue"
:指定队列的名称为trade.pay.success.queue
。-durable = "true"
:表示队列是 持久化 的,即 RabbitMQ 重启后队列依然存在。- 队列的作用:队列是消息的临时存储地,消费者会从队列中拉取消息并处理。- @Exchange-
name = "pay.topic"
:指定交换机的名称为pay.topic
,这是一个 Topic Exchange(主题交换机)。- 交换机的作用:交换机决定消息如何路由到队列。Topic Exchange 可以根据路由键的匹配规则将消息路由到合适的队列。- key = "pay.success"- 路由键:指定了路由键为
pay.success
。这意味着当生产者发送的消息路由键是pay.success
时,消息将被路由到trade.pay.success.queue
队列。
5.总流程处理过程:
- 生产者:在支付成功后,生产者会发送一条消息到
pay.topic
交换机,消息的路由键为pay.success
。 - 交换机:
pay.topic
交换机会根据路由键pay.success
将消息路由到trade.pay.success.queue
队列。 - 消费者:
PayStatusListener
作为消费者监听trade.pay.success.queue
,当有消息到达队列时,它会接收到订单 ID 并调用订单服务更新订单状态。
五.使用配置类管理定义交换机,队列及两者关系:
在 Spring AMQP 中,交换机(Exchange)、队列(Queue)、以及绑定(Binding)可以通过配置类来定义和管理。配置类可以帮助你灵活地创建和绑定交换机与队列,并且可以根据业务需求自定义各种参数。
创建配置类效果展示:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 创建队列
@Bean
public Queue queue() {
return new Queue("trade.pay.success.queue", true); // durable=true 表示队列持久化
}
// 创建交换机
@Bean
public TopicExchange exchange() {
return new TopicExchange("pay.topic"); // 创建主题交换机
}
// 创建绑定关系(队列与交换机通过 routing key 绑定)
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由键是 pay.success
}
}
1.创建队列:Queue:
@Bean
public Queue queue() {
return new Queue("trade.pay.success.queue", true);
}
- 作用:通过
@Bean
注解定义了一个队列 Bean。Spring 容器会自动管理这个队列,并在 RabbitMQ 上创建该队列。 - 参数: -
"trade.pay.success.queue"
:这是队列的名称。每个队列在 RabbitMQ 中必须有唯一的名称。-true
:表示这个队列是 持久化 的。持久化的队列在 RabbitMQ 服务重启后依然存在。
2.创建交换机:Exchange:
@Bean
public TopicExchange exchange() {
return new TopicExchange("pay.topic");
}
- 作用:通过
@Bean
注解定义了一个 Topic Exchange 类型的交换机。 - 参数: -
"pay.topic"
:这是交换机的名称。同样,交换机在 RabbitMQ 中也必须有唯一的名称。
Topic Exchange 是一种交换机类型,它允许使用通配符来进行路由。例如,路由键可以是
"pay.*"
,可以匹配
"pay.success"
或
"pay.failure"
。在这里可以使用四种交换机类型来定义交换机,具体场景具体分析使用。
3.创建绑定关系:Binding:
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("pay.success");
}
- 作用:通过
@Bean
注解定义了队列和交换机的绑定关系。这个绑定决定了消息在何种条件下会从交换机路由到队列。 - 参数: -
queue
:我们定义的trade.pay.success.queue
队列。-exchange
:我们定义的pay.topic
交换机。-"pay.success"
:这是路由键(Routing Key)。它用于告诉交换机:只有当消息的路由键是pay.success
时,消息才会被路由到trade.pay.success.queue
队列。
4.多个队列绑定到同一个交换机:
我们可以将多个队列绑定到同一个交换机,并使用不同的路由键。这样可以实现根据不同的路由键来发送不同类型的消息到各自的队列。
@Bean
public Queue paySuccessQueue() {
return new Queue("pay.success.queue", true);
}
@Bean
public Queue payFailureQueue() {
return new Queue("pay.failure.queue", true);
}
@Bean
public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) {
return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success");
}
@Bean
public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) {
return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure");
}
- 在这个例子中,
pay.success.queue
和pay.failure.queue
都绑定到同一个交换机pay.topic
,但使用不同的路由键。 - 消息路由逻辑: - 当生产者发送路由键为
pay.success
的消息时,消息会路由到pay.success.queue
队列。- 当生产者发送路由键为pay.failure
的消息时,消息会路由到pay.failure.queue
队列。
5.配置不同类型的交换机:
除了 Topic Exchange,RabbitMQ 还支持其他几种常见的交换机类型。这里分别演示如何创建 Direct Exchange、Fanout Exchange 和 Headers Exchange。
(1)Direct Exchange:
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
@Bean
public Binding directBinding(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key");
}
Direct Exchange:直接交换机会根据 完全匹配的路由键 将消息发送到队列。只有当消息的路由键和绑定的路由键 完全一致 时,消息才会被路由到指定队列。
(2)Fanout Exchange:
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
@Bean
public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange); // 不需要路由键
}
Fanout Exchange:扇出交换机会将消息发送到所有绑定的队列,不需要考虑路由键。这个交换机通常用于广播消息。
(3)Headers Exchange:
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.exchange");
}
@Bean
public Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value");
}
Headers Exchange:头交换机根据消息头的内容进行路由,而不是依赖路由键。适用于按消息的元数据进行路由的场景。
总结:
通过 Spring AMQP 的配置类,你可以非常灵活地定义 RabbitMQ 的 交换机、队列 和 绑定关系,并通过不同的路由键和交换机类型实现复杂的消息路由逻辑。以下是一些关键要点:
- 队列(Queue):消息的临时存储地,可以是持久化的。
- 交换机(Exchange):控制消息如何分发到不同的队列。 - Direct Exchange:严格匹配路由键。- Topic Exchange:支持通配符匹配路由键。- Fanout Exchange:广播消息到所有绑定的队列。- Headers Exchange:根据消息头的内容进行路由。
- 绑定(Binding):将队列与交换机连接起来,使用路由键来决定消息的流向。
通过配置类来定义这些组件,能够简化 RabbitMQ 与 Spring 应用的集成,并且通过灵活的路由规则支持复杂的消息传递需求。
版权归原作者 记得开心一点嘛 所有, 如有侵权,请联系我们删除。