概述
MQ 顾名思义,是消息队列。
RabbitMQ 是一个消息队列系统,用于实现异步通信。基于 AMQP。AMQP(高级消息队列协议) 实现了对于消息的排序,点对点通讯,和发布订阅,保持可靠性、保证安全性。
在 Node.js 的微服务架构中,RabbitMQ 可以作为服务之间的消息传递中介,帮助解耦系统组件并提升系统的扩展性和可靠性。支持主流操作系统和多种语言。
通过 RabbitMQ,你可以让一个服务发送消息到队列,另一个服务从队列中消费消息。这种机制使服务之间可以异步通信,特别适合那些不需要立即响应的场景,例如订单处理、日志记录、通知推送等。
下面通过一个电商平台的例子讲解如何使用 Node.js 和 RabbitMQ 来实现异步的订单处理。
RabbitMQ 概念
- Producer(生产者):发送消息到 RabbitMQ 的服务(到交换器中)。
- Consumer(消费者):从 RabbitMQ 队列中接收和处理消息的服务。
- Message(消息):在RabbitMQ中,消息是传递的基本单元。它由消息体和可选的属性组成。
- Queue(队列):消息存储的地方,生产者把消息发送到队列,消费者从队列中接收消息。
- Exchange(交换器):RabbitMQ 用于接收生产者的消息,并根据某种规则将其路由到一个或多个队列。
- Routing Key(路由键):用于帮助交换器将消息路由到正确的队列。
上图来源:小满zs
RabbitMQ 的安装
erlang 的安装
Rabbit MQ的依赖环境 erlang ,MQ是基于这个语言开发的。
官网下载erlang:erlang
然后一直选择默认即可。
安装完成之后,配置一个环境变量。
power shell 测试:
RabbitMQ 的安装
RabbitMQ官网
然后一直默认即可。
之后还是需要配置下环境变量。
还是类似的操作:
安装 RabbitMQ 的可视化面板
直接在终端 安装MQ插件拥有可视化面板:
rabbitmq-plugins enable rabbitmq_management
启动 MQ ,MQ 默认使用端口为 5672:
rabbitmq-server.bat start
这里有部分同学可能启动失败,是因为在安装 RabbitMQ 的时候,最后一步默认安装后启动 RabbitMQ 被自动勾选了,所以我们需要打开服务,手动停止 RabbitMQ 服务,然后使用 MQ 的插件手动启动 服务就可以了。
启动好后该终端不要关闭,访问 http://localhost:15672/#/ 账号密码都是 guest:
就进入了 MQ 的可视化界面。
Node.js 使用 RabbitMQ
这边它的官网也是由很多语言的使用教程 JavaScript使用 MQ:
安装必要的依赖
在 Node.js 中使用 RabbitMQ,通常会用到
amqplib
这个库来与 RabbitMQ 进行交互。
npminstall amqplib express
express 帮助我们快速启动服务。
RabbitMQ 示例:异步订单处理
我们以订单服务为例来说明如何通过 RabbitMQ 实现异步订单处理。假设系统中有两个服务:
- 订单服务(Order Service):负责接收用户订单,并将订单消息发送到 RabbitMQ 队列。
- 支付服务(Payment Service):从 RabbitMQ 队列中接收订单信息并处理支付。
订单服务(生产者)
订单服务接收到用户创建订单的请求后,会将订单信息发送到 RabbitMQ 的队列中,而不需要立即处理支付流程。
// order-service.js 生产者const express =require('express');const amqp =require('amqplib/callback_api');const app =express();
app.use(express.json());let channel =null;// 连接 RabbitMQ 并创建一个频道
amqp.connect('amqp://localhost:5672',(error, connection)=>{if(error){throw error;}
connection.createChannel((err, ch)=>{if(err){throw err;}
channel = ch;// 创建队列const queue ='orderQueue';
channel.assertQueue(queue,{durable:false});});});// 订单服务 API
app.post('/order',(req, res)=>{const order ={userId: req.body.userId,productId: req.body.productId };// 将订单发送到 RabbitMQ 队列
channel.sendToQueue('orderQueue', Buffer.from(JSON.stringify(order)),{persistent:true// 持久化消息(避免服务器宕机) 底层原理是 存储在磁盘});
console.log("Order sent to queue:", order);
res.status(201).json({message:'Order placed successfully!'});});// 启动订单服务
app.listen(3001,()=>{
console.log('Order service is running on port 3001');});
在这里,订单服务通过
sendToQueue()
方法将订单信息发送到名为
orderQueue
的队列中。
支付服务(消费者)
支付服务从
orderQueue
中获取订单,并处理订单支付。
// payment-service.js 消费者const amqp =require('amqplib/callback_api');// 连接 RabbitMQ 并创建一个频道
amqp.connect('amqp://localhost:5672',(error, connection)=>{if(error){throw error;}
connection.createChannel((err, channel)=>{if(err){throw err;}const queue ='orderQueue';// 确保队列存在
channel.assertQueue(queue,{durable:false// 队列和交换机的持久化});// 从队列中消费消息
console.log('Waiting for messages in %s. To exit press CTRL+C', queue);
channel.consume(queue,(msg)=>{const order =JSON.parse(msg.content.toString());
console.log('Received order:', order);// 假设支付处理逻辑setTimeout(()=>{
console.log('Payment processed for order:', order);
channel.ack(msg);// 确认消息处理完成},1000);},{noAck:false// 确保消息处理完成后才确认});});});
支付服务通过
consume()
方法从
orderQueue
中获取订单消息,并处理支付逻辑。每次支付处理完成后,支付服务会调用
ack()
方法确认消息已经成功处理。
开启一个 http 请求,进行测试:
POST http://localhost:3001/order HTTP/1.1
Content-Type: application/json
{
"userId": 1,
"productId": 1
}
响应:
{"message":"Order placed successfully!"}
RabbitMQ 的工作流程
- 用户通过订单服务创建订单。
- 订单服务将订单信息发送到 RabbitMQ 队列中。
- 支付服务从队列中接收订单消息,并异步处理订单支付。
这个流程使订单服务和支付服务解耦。订单服务只需要把消息发送到队列,支付服务则在后台处理支付逻辑。这样的架构具有以下优势:
- 异步处理:订单服务不需要等待支付完成,响应用户请求变得更加快速。
- 解耦:订单服务和支付服务可以独立扩展和维护,互不依赖。
- 负载均衡:可以轻松地扩展多个支付服务实例,从同一个队列中消费消息,提升系统的处理能力。
- 而且服务之间使用不同的语言也是完全可以的。
版权归原作者 秀秀_heo 所有, 如有侵权,请联系我们删除。