前端监控系统概述
前端监控系统主要负责采集用户端的异常、性能、业务埋点等数据,并将这些数据上报到服务端进行存储与可视化分析。
随着用户量的增加,数据量也会相应增大,这会对服务端产生较大的并发压力。直接将大量数据存入数据库可能会导致数据库服务崩溃。
解决方案:使用消息队列
为了应对高并发情况,可以采用消息队列(如 RabbitMQ)来优化数据处理流程。
具体流程如下:
- 数据接收与缓存:一个 Web 服务接收前端发送的数据请求,将数据存入 RabbitMQ,而不是直接写入数据库。
- 数据处理与存储:另一个 Web 服务从 RabbitMQ 中取出数据,然后存入数据库。
这样做的好处是,消息队列的并发处理能力远高于数据库,可以快速响应并缓存大量请求,从而减轻数据库的直接压力。
消息队列的并发控制
通过设置消息队列的消费速率来控制数据流向数据库的速度,例如设置每次从队列中取出 1w 条消息进行处理。
这种方法可以有效避免数据库因突发大流量而崩溃,实现了流量削峰。
消息队列的扩展性
RabbitMQ 支持水平扩展,可以增加多个 Web 服务同时消费队列中的消息,进一步增强系统的处理能力。
运行 RabbitMQ 服务
我们可以通过 docker 来跑 RabbitMQ。
搜索 rabbitmq 的镜像,选择 3.11-management 的版本:
这个版本是有 web 管理界面的。
点击 run:
run 运行后,可以通过浏览器访问 http://localhost:15672 来管理消息队列,这就是它的 web 管理界面:
账号密码全输入 guest ,进入管理页面:
可以看到 connection、channel、exchange、queue 的分别的管理页面。
Node.js 实现 RabbitMQ
环境准备
首先,创建一个新的项目目录,并初始化 Node.js 项目:
bash
复制代码
mkdir rabbitmq-test cd rabbitmq-test npm init -y
安装依赖
安装 amqplib 包:
bash
复制代码
npm install amqplib
配置项目
由于我们将使用 ES module 语法和顶层 await,您需要在 package.json 中添加以下配置:
json
复制代码
"type": "module"
生产者设置
创建 src/producer.js 文件,并编写以下代码:
javascript
复制代码
// 引入 amqplib 库,用于操作 AMQP 协议(高级消息队列协议) import * as amqp from 'amqplib'; // 连接到本地的 RabbitMQ 服务,端口默认为 5672 const connect = await amqp.connect('amqp://localhost:5672'); // 创建一个通道,大部分的 API 操作都在通道中进行 const channel = await connect.createChannel(); // 声明一个队列,如果队列不存在则创建,名为 'test1' await channel.assertQueue('test1'); // 向名为 'test1' 的队列发送消息,消息内容为 'hello yun' await channel.sendToQueue('test1', Buffer.from('hello yun')); // 控制台打印消息,表示消息已被发送 console.log('消息已发送');
运行生产者脚本:
bash
复制代码
node ./src/producer.js
消费者设置
创建 src/consumer.js 文件,并编写以下代码:
javascript
复制代码
import * as amqp from 'amqplib'; // 连接到本地 RabbitMQ 服务器,端口为 5672 const connect = await amqp.connect('amqp://localhost:5672'); // 在连接上创建一个通道 const channel = await connect.createChannel(); // 确保 'test1' 队列存在,如果不存在则创建 const { queue } = await channel.assertQueue('test1'); // 消费 'test1' 队列中的消息 channel.consume( queue, // 指定要消费的队列名 msg => { // 当收到消息时,执行的回调函数 console.log('收到消息:', msg.content.toString()); }, { noAck: true } // 消费配置选项,noAck 为 true 表示不需要发送确认 );
运行消费者脚本:
bash
复制代码
node ./src/consumer.js
控制消息并发
生产者并发控制
修改 src/producer.js,以每 0.5 秒发送一条消息:
javascript
复制代码
import * as amqp from 'amqplib'; // 连接到 RabbitMQ 服务器 const connect = await amqp.connect('amqp://localhost:5672'); // 创建一个通道 const channel = await connect.createChannel(); // 声明一个名为 'test2' 的队列,并设置其持久化 await channel.assertQueue('test2', { durable: true }); let i = 1; // 每隔 500 毫秒发送一条消息到 'test2' 队列 setInterval(async () => { const msg = 'hello' + i; // 构建消息内容 console.log('发送消息:', msg); // 打印发送的消息 await channel.sendToQueue('test2', Buffer.from(msg)); // 发送消息到队列 i++; // 递增消息计数器 }, 500);
消费者并发控制
修改 src/consumer.js,设置 prefetch 为 3,并每秒确认一次消息:
javascript
复制代码
import * as amqp from 'amqplib'; async function startConsumer() { try { const connect = await amqp.connect('amqp://localhost:5672'); // 创建一个通道,用于消息的发送和接收 const channel = await connect.createChannel(); // 声明(如果不存在则创建)一个名为 'test2' 的队列,并获取队列的信息 const { queue } = await channel.assertQueue('test2'); // 设置该通道一次只接收并处理 3 条消息 channel.prefetch(3); // 用于存储当前接收到但还未处理的任务 const currentTask = []; // 监听名为 'test2' 的队列,接收到消息后执行回调函数 channel.consume( queue, msg => { // 将接收到的消息添加到 currentTask 数组中 currentTask.push(msg); // 控制台打印接收到的消息内容 console.log('收到消息:', msg.content.toString()); }, // 设置不自动确认消息,需要手动调用 ack 方法确认 { noAck: false } ); // 每隔 1000 毫秒(1 秒)检查 currentTask 数组,如果有任务则从数组中取出并确认消息 setInterval(() => { if (currentTask.length > 0) { const curMsg = currentTask.shift(); // 使用 shift 方法从数组头部取出消息 if (curMsg) { // 手动确认消息,告诉 RabbitMQ 该消息已被正确处理 channel.ack(curMsg); } } }, 1000); } catch (error) { console.error('Error in consumer:', error); } } // 启动消费者 startConsumer();
RabbitMQ 架构图解
- Producer 和 Consumer:分别代表消息的生产者和消费者。
- Connection 和 Channel:Connection 是连接,但并不是每次使用 RabbitMQ 都创建一个单独的 Connection。相反,我们在一个 Connection 中创建多个 Channel,每个 Channel 负责自己的任务。
- Queue:消息存取的地方。
- Broker:整个接收和转发消息的服务。
- Exchange:用于将消息路由到不同的队列中。
Exchange 类型
Exchange 主要有四种类型:
- direct:将消息路由到指定的队列。
- topic:支持模糊匹配,根据模式将消息路由到相应的队列。
- fanout:将消息广播到所有绑定的队列中。
- headers:根据消息的 headers 属性进行路由。
使用 direct 类型的 Exchange
生产者端
src/direct.js
javascript
复制代码
import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); // 创建一个通道 const channel = await connect.createChannel(); // 声明一个名为 'direct-test-exchange' 的直连交换机 await channel.assertExchange('direct-test-exchange', 'direct'); // 发送消息 'hello1' 到交换机 'direct-test-exchange',路由键为 'test1' channel.publish('direct-test-exchange', 'test1', Buffer.from('hello1')); // 发送消息 'hello2' 到交换机 'direct-test-exchange',路由键为 'test2' channel.publish('direct-test-exchange', 'test2', Buffer.from('hello2')); // 发送消息 'hello3' 到交换机 'direct-test-exchange',路由键为 'test3' channel.publish('direct-test-exchange', 'test3', Buffer.from('hello3'));
在这里,我们创建了一个 direct 类型的 Exchange,然后发布三条消息,指定不同的 routing key。
消费者端
javascript
复制代码
// src/direct-consumer1.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); // 创建一个通道 const channel = await connect.createChannel(); // 声明一个名为 'queue1' 的队列,如果该队列不存在则会被创建 const { queue } = await channel.assertQueue('queue1'); // 将队列绑定到名为 'direct-test-exchange' 的交换机,并指定路由键为 'test1' await channel.bindQueue(queue, 'direct-test-exchange', 'test1'); // 消费队列中的消息,并在收到消息时打印消息内容 channel.consume( queue, msg => { console.log(msg.content.toString()); // 打印消息内容 }, { noAck: true } // 设置为自动确认消息 );
javascript
复制代码
// src/direct-consumer2.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); const { queue } = await channel.assertQueue('queue2'); await channel.bindQueue(queue, 'direct-test-exchange', 'test2'); channel.consume( queue, msg => { console.log(msg.content.toString()); }, { noAck: true } );
两个消费者分别监听不同的队列,并绑定到相应的 routing key。
在管理页面上也可以看到这个交换机的信息:
包括 exchange 下的两个 queue 以及各自的 routing key:
这里还能发送消息:
使用 topic 类型的 Exchange
生产者端
javascript
复制代码
// src/topic.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange2', 'topic'); channel.publish('direct-test-exchange2', 'test1.111', Buffer.from('hello1')); channel.publish('direct-test-exchange2', 'test1.222', Buffer.from('hello2')); channel.publish('direct-test-exchange2', 'test2.1111', Buffer.from('hello3'));
消费者端
javascript
复制代码
// src/topic-consumer1.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange2', 'topic'); const { queue } = await channel.assertQueue('queue1'); await channel.bindQueue(queue, 'direct-test-exchange2', 'test1.*'); channel.consume( queue, msg => { console.log(msg.content.toString()); }, { noAck: true } );
javascript
复制代码
// src/topic-consumer2.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange2', 'topic'); const { queue } = await channel.assertQueue('queue2'); await channel.bindQueue(queue, 'direct-test-exchange2', 'test2.*'); channel.consume( queue, msg => { console.log(msg.content.toString()); }, { noAck: true } );
两个消费者监听不同的模糊匹配模式的 routing key。
使用 fanout 类型的 Exchange
生产者端
typescript
复制代码
import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); // 声明一个名为 'direct-test-exchange3' 的交换机,类型为 'fanout' // 'fanout' 类型的交换机会将消息广播给所有绑定到该交换机的队列 await channel.assertExchange('direct-test-exchange3', 'fanout'); // 向交换机 'direct-test-exchange3' 发布消息 'hello1' // 因为是 'fanout' 类型的交换机,不需要指定 routingKey,routingKey 为空字符串 channel.publish('direct-test-exchange3', '', Buffer.from('hello1')); // 向交换机 'direct-test-exchange3' 发布消息 'hello2' channel.publish('direct-test-exchange3', '', Buffer.from('hello2')); // 向交换机 'direct-test-exchange3' 发布消息 'hello3' channel.publish('direct-test-exchange3', '', Buffer.from('hello3'));
消费者端
typescript
复制代码
// src/fanout-consumer1.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange3', 'fanout'); const { queue } = await channel.assertQueue('queue1'); await channel.bindQueue(queue, 'direct-test-exchange3'); channel.consume( queue, msg => { console.log(msg.content.toString()); }, { noAck: true } );
typescript
复制代码
// src/fanout-consumer2.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange3', 'fanout'); const { queue } = await channel.assertQueue('queue2'); await channel.bindQueue(queue, 'direct-test-exchange3'); channel.consume( queue, msg => { console.log(msg.content.toString()); }, { noAck: true } );
两个消费者监听同一个 fanout 类型的 Exchange,都会接收到消息。
使用 headers 类型的 Exchange
生产者端
javascript
复制代码
// src/headers.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange4', 'headers'); channel.publish('direct-test-exchange4', '', Buffer.from('hello1'), { headers: { name: 'yun', // 设置消息头部属性 name 为 'yun' }, }); channel.publish('direct-test-exchange4', '', Buffer.from('hello2'), { headers: { name: 'yun', // 设置消息头部属性 name 为 'yun' }, }); channel.publish('direct-test-exchange4', '', Buffer.from('hello3'), { headers: { name: 'mu', // 设置消息头部属性 name 为 'mu' }, });
消费者端
javascript
复制代码
// src/headers-consumer1.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange4', 'headers'); const { queue } = await channel.assertQueue('queue1'); await channel.bindQueue(queue, 'direct-test-exchange4', '', { name: 'yun' }); channel.consume( queue, msg => { console.log(msg.content.toString()); }, { noAck: true } );
javascript
复制代码
// src/headers-consumer2.js import * as amqp from 'amqplib'; const connect = await amqp.connect('amqp://localhost:5672'); const channel = await connect.createChannel(); await channel.assertExchange('direct-test-exchange4', 'headers'); const { queue } = await channel.assertQueue('queue2'); await channel.bindQueue(queue, 'direct-test-exchange4', '', { name: 'mu' }); channel.consume( queue, msg => { console.log(msg.content.toString()); }, { noAck: true } );
两个消费者分别根据消息头的属性接收不同的消息。
总结
RabbitMQ 通过以下几种方式解决了常见的问题:
- 流量削峰:可以将大量流量放入消息队列中,然后慢慢处理,避免系统崩溃。
- 应用解耦:应用程序之间不直接依赖,即使某个应用程序挂掉,也可以在恢复后继续从消息队列中消费消息,不会影响到其他应用。
版权归原作者 Java-之父 所有, 如有侵权,请联系我们删除。