一、分布式消息队列
中间件
连接多个系统,帮助多个系统紧密协作的技术(或者组件)。
比如:Redis、消息队列、分布式存储Etcd
谈到中间人,我们必须引入一个概念,那就是中间件。
什么是中间件?
可以将其视为在开发系统或多个应用时,用于连接多个系统或使多个系统紧密协作的工具。常用的中间件包括Redis,消息队列,以及分布式存储如ETCD等。
事实上,如果非要定义,数据库也可以被视为一种中间件。
例如,假设我们有两个系统,系统A和系统B,他们需要共享同一份数据。这时,我们可以将数据存储在数据库中,这样,数据库就像是连接两个系统的“中间人”,即满足中间件的定义。
在NodeJS中,中间件的含义稍有不同,它指的是在不影响业务逻辑的前提下,增强系统功能的工具,连接通用功能和系统。
问:如何让两个系统紧密协作呢?
答:这时,我们就可以使用中间件。
例如,我们可以使用Redis来存储共享数据。主系统和只能分析服务都可以读取Redis中的数据。这样,Redis就充当了一个“中间人”的角色,连接了两个系统,这就是中间件的概念。
消息队列
什么是消息队列?
从字面上理解,消息队列就是存储消息的队列。队列有一个显著特点,那就是“FIFO”。就像我们在食堂排队打饭一样,排在前面的人先打饭,这是队列最常见的特点。
假设这是一个消息队列,它可以存储消息。那么这就引出了我们的核心概念,即消息的存储。其中涉及到三个关键词:存储、队列和消息。
问:什么是存储?
答:存储意味着它可以保存数据或消息。
问:消息又是什么呢?
答:消息可能是某种数据结构,例如字符串、对象、二进制数据或Json数据等。
问:什么是队列?
答:一种FIFO的数据结构。
消息队列总结
消息队列:用于存储信息的队列。
此处的关键词有三个:存储、消息、队列
存储:对数据的存储能力
消息:指任何形式的数据结构,例如字符串、对象、二进制数据、JSON等
队列:具有先进先出特性的数据结构问:消息队列是特殊的数据库吗?
答:可以这么认为,但是消息队列的核心作用不只是存储数据。消息队列的应用场景(作用):在多个不同的系统、应用之间实现消息的传输(也可以存储)。不需要考虑传输应用的编程语言、系统、框架等。
例如:可以让java开发的应用发消息,让php开发的应用收消息,这样就不用吧所有代码写到同一个项目里(应用解耦)。
消息队列的模型
消息队列主要由四部分组成:消息生产者(Producer)、消息消费者(Consumer)、消息(Message)和消息队列(Queue)。
示例场景:
消息队列的模型总结
生产者:Producer,类比为快递员,发送消息的人(客户端)
消费者:Consumer,类比取快递的人,接受读取消息的人(客户端)
消息:Message,类比为快递,就是生产者要传输给消费者的数据
消息队列:Queue问:为什么不直接传输,要用消息队列?
答:生产者不用关心你的消费者要不要消费、什么时候消费,我只需要把东西给消息队列,我的工作就算完成了。生产者和消费者实现了解耦,互不影响。
消息队列的优势
首先是异步处理,所谓的异步处理,意味着生产者在发送完消息后可以立即转而进行其他任务,而无需等待消费者处理消息。这样生产者就无需等待消费者接受消息才能进行下一步操作,避免了阻塞。这与我们之前讨论的异步化处理非常类似,消息队列使我们的系统具备了这种能力。
其次,消息队列还有削峰填谷的能力。削峰填谷是指当消费者的处理能力有限时,而用户的请求量又很大,我们可以先将用户的请求存储在消息队列中,然后消费者或实际执行应用可以按照自身的处理能力逐步从队列中取出请求。
消息队列的优势总结
- 异步处理:一旦生产者发送完消息,便可以立即转向其他任务,而消费者则可以在任何时候开始处理消息。这样一来,生产者和消费者之间就不会发送阻塞。
- 削峰填谷:消息队列允许我们先将用户请求存储起来,然后消费者可以根据自身的处理能力和需求,逐步从消息队列取出并处理请求。
- 原本:12 点时来了 10 万个请求,原本情况下,10 万个请求都在系统内部立刻处理,很快系统压力过大就宕机了2. 现在::把这 10 万个请求放到消息队列中,处理系统以自己的恒定速率(比如每秒 1个)慢慢执行,从而保护系统、稳定处理。
分布式消息队列的优势
首先,分布式消息队列支持消息持久化,也就是数据持久化。它能将我们的消息集中存储到硬盘里,因此服务器重启后,数据不会丢失。就如同快递一样,即使丢失,也有一定的机制能帮你找回,这是分布式消息队列的首要优势。
其次,分布式消息队列具有可扩展性,这是分布式与单机最大的区别。如果一个服务器只能处理1000 个用户的请求,超出这个数量的请求,服务器可能就无法承受,甚至会宕机。然而,可扩展生意味着无论你的用户数量再多,通过增加机器,我们都能自动地承受新增的用户。分布式的特点就是可以根据需求随时增加或减少节点,以保持服务稳定。
再者,分布式消息队列能够实现应用解耦。这是在分布式场景下才能实现的功能,它允许各个使用不同语言框架开发的系统之间进行灵活的数据传输与读取。
此外,让我们来讨论一下应用解耦和分布式消息队列的另一个优势,这也是面试中常被问到的问题。假设我们有一个订单系统,库存系统和发货系统。以往,我们将所有这些系统全都放到同-个大项目中,那会带来哪些问题呢?
以上面示例的订单系统为例,它需要调用库存系统进行减库存,然后又要调用发货系统进行发货。如果库存系统调用成功,但发货系统突然调用失败,整个系统就会出现问题。如果发货系统崩溃,库存系统可能也会受到影响。但如果我们实施应用解耦,订单系统下订单时只需向消息队列发送一个消息,然后立即返回。库存系统和发货系统可以从消息队列中取出消息进行处理。如果发货系统突然宕机,也不会影响库存系统。当发货系统恢复后,可以从消息队列中找到订单消息,然后继续执行业务逻辑。
最后一个优点是,通过消息队列,订单系统不需要同步调用所有系统,只需发送消息到队列就可以立即返回,这样性能更高,响应时间更短。这就是为什么我们在构建秒杀系统时,必须要用消息队列,你可以将所有耗时的操作全部推给消息队列,让库存和发货系统自己去处理订单,它们可以自己去保证订单不被重复执行等。当然,实际情况要复杂一些。
在大公司中,消息队列还有一个可能被忽视的应用场景,这就是发布订阅模式,可以说这是消息队列的第四个优势。我们常在公司中遇到这样的场景:假设你在一家大公司,以腾讯为例,我们都知道,腾讯有许多产品,许多产品都需要用 QQ 号登录。如果 QQ 对其账号体系进行了改革,那么所有使用 QQ 号登录的下游系统都必须知道这个变化。
现在设想这样一个场景,腾讯有非常多的子系统,例如微信、王者荣耀等,假设有 100 个这样的系统。如果 QQ 进行了一些改革,我们如何将这些改变通知给这些系统?最简单,但也最费时的方法,就是直接一个个通知。例如,QQ 发布了新的数据或进行了一些调整,需要让其他系统知道,或者 QQ 发布了新的公告,其他系统也需要同步这个公告。如果 QQ 直接调用其他系统,这会有什么问题呢?
首先,系统越多,这个操作的时间就会越长,而且可能会遇到失败的情况,这无疑是一项繁琐的工作。其次,假设现在有100 个系统,如果有新的项目或团队加入,他们可能不知道要接收 QQ 的这些变动,而且 QQ 也无法得知新加入的项目,因此可能会有信息漏洞,这是致命的。
那我们应该怎么解决这个问题呢?解决方案就是,大的核心系统(如 QQ)向一个地方,也就是我们的消息队列发送消息,然后其他的系统都去订阅这个消息队列,读取这个消息队列中的消息就可以了。这样,QQ 就不需要关心背后有多少系统,只需要向消息队列发送消息,其他系统如果需要这些消息,就可以从消息队列中获取,这样就解决了上述的问题。
分布式消息队列的优势总结
- 数据持久化:它可以把消息集中存储到硬盘里,服务器重启就不会丢失。
- 可扩展性:可以根据需求,随时增加(或减少)节点,继续保持稳定的服务。
- 应用解耦:可以连接各个不同语言、框架开发的系统,让这些系统能够灵活传输读取数据。 1. 应用解耦的优点 1. 一个系统挂了,不影响另一个系统2. 系统挂了并恢复后,仍然可以取出消息,继续执行业务逻辑3. 只要发送消息到队列,就可以立刻返回,不用同步调用所有系统,性能更高
- 发布订阅:大的核心系统始终往一个地方(消息队列)去发消息,其他的系统都去订阅这个消息队列(读取这个消息队列中的消息)
消息队列的应用场景
- 耗时的场景(异步)
- 高并发场景(异步、削峰填谷)
- 分布式系统协作(尤其是跨团队、跨业务协作、应用解耦)
- 强稳定性的场景(比如金融业务,持久化、可靠性、削峰填谷)
说明:
消息队列的优势包括数据持久化、可扩展性、应用解耦,以及发布订阅模式。现在,我们来探讨一下在何时我们需要使用消息队列。
第一,当遇到耗时任务的场景,我们可以考虑使用消息队列。原因是消息队列能提供异步处理的能力。1793307484859105281_0.8689215618690482
第二,在高并发场景下,消息队列也是非常有用的工具。它不仅提供异步处理的能力,还有助于实现流量的削峰填谷,这是消息队列的一个显著优点。
第三,在分布式系统协作的环境下,消息队列同样非常适用。例如,QQ 团队和微信团队各自拥有自己的系统,这两个系统可能使用不同的语言,基于不同的技术栈开发。你不能像处理传统的单体应用那样,把所有东西都写在一个 Spring Boot 项目中,你需要通过某种方式,如消息队列,来实现系统之间的解耦。QQ 的消息能够通过消息队列传输给微信,实现跨团队、跨系统、跨业务的协作。这正是消息队列的应用解耦优势的体现。
最后,如果需要保证系统的高稳定性或者强稳定性,消息队列也是非常重要的工具。例如,在金融支付转账的场景下,对系统稳定性的要求非常高,不允许有任何错误。在这种情况下,我们可以利用消息队列的持久化和可靠性保证特性,以及削峰填谷的能力,来满足高稳定性的需求。1793307484859105281_0.05188528090536093
因此,在开发过程中遇到以上这些场景,就可以考虑使用消息队列了。
- 17933
消息队列的缺点
首先,最直接的问题是使用消息队列意味着你需要学习和掌握一个新的工具,并在你的系统中引入一个额外的中间件。这将会使你的系统变得更复杂,并需要更多的维护工作。若你在公司实施此类解决方案,我们通常会选择由第三方大公司提供的稳定中间件,而这会产生额外的成本。即使你自己部署和维护,也需要额外的资源投入。
另外,一旦你开始使用消息队列,你就需要承担由此带来的各种可能问题,例如消息丢失。并非消息队列就可以保证消息不会丢失,比如在发送消息的过程中,可能就因为某些原因而失败。
再者,需要保证消息的顺序性,即消息需要按照特定的顺序被消费。此外,还需要防止消息的重复消费,避免同一个系统多次处理同一条消息。同时,还需要保证数据的一致性,这并非是消息队列的特定问题,而是任何分布式系统都需要考虑的问题。例如,分布式锁就是为了解决分布式系统中多个服务器之间的一致性问题。
主流分布式消息队列选型
技术选型指标:
- 吞吐量:IO、并发
- 时效性:类似延迟,消息的发送、到达时间1793307484859105281_0.2263001767791472
- 可用性:系统可用的比率(比如 1 年 365 天宕机 1s,可用率大概 X 个 9)
- 可靠性:消息不丢失(比如不丢失订单)、功能正常完成859105281_0.2263001767791472 | ** | ** | ** | ** | ** | ** | ** | | — | — | — | — | — | — | — | | activemq | 万级 | 高5423 | 高 | 高 | 简单易学1 | 中小型企业、项目 | | rabbitmq | 万级 | 极高(微秒)1 | 高 | 高 | 生态好(基本什么语言都支持)、时效性高、易学 | 适合绝大多数分布式的应用,这也是先学他的原因 | | kafka | 十万级 | 高(毫秒以内) | 极高 | 极高 | 吞吐量大、可靠性、可用性,强大的数据流处理能力 | 适用于大规模处理数据的场景,比如构建日志收集系统、实时数据流传输、事件流收集传输 | | rocketmq | 十万级 | 高(ms) | 极高 | 极高 | 吞吐量大、可靠性、可用性,可扩展性 | 适用于金融 、电商等对可靠性要求较高的场景,适合大规模的消息处理。 | | pulsar | 十万级 | 高(ms) | 极高0664 | 极高 | 可靠性、可用性很高,基于发布订阅模型,新兴(技术架构先进) | 适合大规模、高并发的分布式系统(云原生)。适合实时分析、事件流处理、IoT 数据处理等。 |
二、RabbtiMQ介绍
基本概念
首先,我们要介绍一个基本概念,也就是 RabbitMQ 中的 AMQP 协议。
问:什么是 AMQP 呢?
答:AMQP 的全称是 Advanced Message Queue Protocol,即高级消息队列协议。RabbitMQ 就是根据这个协议开发的。AMQP 是一个标准的协议,不仅 RabbitMQ,如果你想自己实现一个消息队列,也可以按照这个协议来设计。
- 生产者(Publisher):发消息到某个交换机。
- 消费者(Consumer):从某个队列中取消息。
- 交换机(Exchange):像路由器一样,负责将消息从生产者发送到对应的队列。
- 队列(Queue):存储消息的地方。
- 路由(Routes):转发,就是怎么把消息从一个地方转到另一个地方(比如从生产者转发到某个队列)。
安装
官方文档:Installing RabbitMQ | RabbitMQ
在安装完后RabbitMQ的sbin目录下都是RabbitMQ的执行脚本,其中:
- rabbitmq-server.bat:操作 RabbitMQ 服务器相关的命令。
- rabbitmq-plugins.bat:用来安装 RabbitMQ 的插件。
rabbitmq-plugins.bat enable rabbitmq_management
安装完插件后重新启动
# 关闭服务
net stop rabbitmq
# 开启服务
net start rabbitmq
访问:http://localhost:15672 ,默认用户名密码都是 guest
三、RabbitMQ入门
在pom.xml添加以下依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>rabbitmq-version</version></dependency>
Channel-频道:理解为操作消息队列的client(比如jdbcClient、redisClient),提供了和消息队列server建立通信的传输方法。
创建消息队列:
参数:
queueName:消息队列名称
durabale:消息队列重启后,消息是否丢失
exclusive:是否只允许当前这个创建消息队列的连接操作消息队列
autoDelete:没有人用队列后,是否要删除队列
简单模式(单一消息)
官方文档:RabbitMQ tutorial - “Hello World!” | RabbitMQ
场景:一个生产者给一个队列发消息,一个消费者从这个队列取消息。一对一
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjava.nio.charset.StandardCharsets;// 定义一个名为SingleProducer的公开类,用于实现消息发送功能publicclassSingleProducer{
// 定义一个静态常量字符串QUEUE_NAME,它的值为"hello",表示我们要向名为"hello"的队列发送消息privatefinalstaticStringQUEUE_NAME="hello";// 定义程序的入口点:一个公开的静态main方法,它抛出Exception异常publicstaticvoidmain(String[] argv)throwsException{
// 创建一个ConnectionFactory对象,这个对象可以用于创建到RabbitMQ服务器的连接ConnectionFactory factory =newConnectionFactory();// 设置ConnectionFactory的主机名为"localhost",这表示我们将连接到本地运行的RabbitMQ服务器
factory.setHost("localhost");// 如果你改了本地的用户名和密码,你可能要指定userName、userPassword,// 如果改了本地的端口,还要改Port。// 那我们这里不需要,我们这里就用默认的localhost,默认的用户名和密码,就是guest// factory.setUsername();// factory.setPassword();// factory.setPort();// 使用ConnectionFactory创建一个新的连接,这个连接用于和RabbitMQ服务器进行交互try(Connection connection = factory.newConnection();// 通过已建立的连接创建一个新的频道Channel channel = connection.createChannel()){
// 在通道上声明一个队列,我们在此指定的队列名为"hello"
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 创建要发送的消息,这里我们将要发送的消息内容设置为"Hello World!"String message ="Hello World!";// 使用channel.basicPublish方法将消息发布到指定的队列中。这里我们指定的队列名为"hello"
channel.basicPublish("",QUEUE_NAME,null, message.getBytes(StandardCharsets.UTF_8));// 使用channel.basicPublish方法将消息发布到指定的队列中。这里我们指定的队列名为"hello"System.out.println(" [x] Sent '"+ message +"'");}}}
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.DeliverCallback;importjava.nio.charset.StandardCharsets;publicclassSingleConsumer{
// 定义我们正在监听的队列名称privatefinalstaticStringQUEUE_NAME="hello";publicstaticvoidmain(String[] argv)throwsException{
// 创建连接,创建连接工厂ConnectionFactory factory =newConnectionFactory();// 设置连接工厂的主机名,这里我们连接的是本地的RabbitMQ服务器
factory.setHost("localhost");// 从工厂获取一个新的连接Connection connection = factory.newConnection();// 从连接中创建一个新的频道Channel channel = connection.createChannel();// 创建队列,在该频道上声明我们正在监听的队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 在控制台打印等待接收消息的信息System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义了如何处理消息,创建一个新的DeliverCallback来处理接收到的消息DeliverCallback deliverCallback =(consumerTag, delivery)->{
// 将消息体转换为字符串String message =newString(delivery.getBody(),StandardCharsets.UTF_8);// 在控制台打印已接收消息的信息System.out.println(" [x] Received '"+ message +"'");};// 在频道上开始消费队列中的消息,接收到的消息会传递给deliverCallback来处理,会持续阻塞
channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{
});}}
工作队列模式(多消费者)
官方文档:RabbitMQ tutorial - Work Queues | RabbitMQ
场景:多个机器同时去接受并处理任务(尤其是每个机器的处理能力有限),也就是一个生产者给一个队列发消息,多个消费者从这个队列取消息。一对多
公平分配
简单来说就是消费者一人分一条消息。
官方解读:
您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两个工人的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工人将一直很忙,而另一个工人几乎不会做任何工作。嗯,RabbitMQ对此一无所知,并且仍然会均匀地分发消息。
发生这种情况是因为RabbitMQ在消息进入队列时只调度消息。它不考虑消费者未确认的消息的数量。它只是盲目地将每第n条消息发送给第n个消费者。
为了克服这个问题,我们可以使用basicQos方法并设置prefetchCount = 1。这告诉RabbitMQ不要同时向工作人员发送多条消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将其发送给下一个不忙的工作人员。
int prefetchCount =1;
channel.basicQos(prefetchCount);
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.MessageProperties;importjava.util.Scanner;publicclassMultiProducer{
// 定义要使用的队列名称,这次的队列就改叫multi_queueprivatestaticfinalStringTASK_QUEUE_NAME="multi_queue";publicstaticvoidmain(String[] argv)throwsException{
// 创建一个连接工厂ConnectionFactory factory =newConnectionFactory();// 设置RabbitMQ服务的主机名
factory.setHost("localhost");// 创建一个新的连接try(Connection connection = factory.newConnection();// 创建一个新的频道Channel channel = connection.createChannel()){
// 声明队列参数,包括队列名称、是否持久化等
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);// 创建一个输入扫描器,用于读取控制台输入Scanner scanner =newScanner(System.in);// 使用循环,每当用户在控制台输入一行文本,就将其作为消息发送while(scanner.hasNext()){
// 读取用户在控制台输入的下一行文本String message = scanner.nextLine();// 发布消息到队列,设置消息持久化
channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));// 输出到控制台,表示消息已发送System.out.println(" [x] Sent '"+ message +"'");}}}}
importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.C
版权归原作者 Cammy_OnceKim 所有, 如有侵权,请联系我们删除。