0


RabbitMQ

什么是MQ?

MQ也就是 message queue,俗称消息队列。

消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

为什么要用MQ?

1.可以用作流量削峰

应用场景在订单系统时,若订单系统一次最高处理的订单为10k条,此时一下子若有20k条用户数据系统显然时无法处理,则使用MQ可以将剩下的10k条数据分散成一段时间来处理,用户在接来的时间陆续收到成功下单的操作通知,而不使用MQ则系统可能会直接宕机。

2.可以用作应用解耦

耦合度一直以来在开发场景中是一个需要减少的操作,以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于 消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流 系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

3.可以进行异步处理

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可 以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题, A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消 息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样B 服务也不用 做这些操作。A 服务还能及时的得到异步处理成功的消息。

而rabbitMQ就是一种典型的MQ,结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分 方便,适合数据量不是巨大,中小型公司优先选择功能比较完备的 RabbitMQ。

RabbitMQ安装

rabbitMQ安装参考rabbitMQ官方文档Installing on RPM-based Linux (RedHat Enterprise Linux, CentOS, Fedora, openSUSE) — RabbitMQ

也可以参考另外一个博主,写的很详细

RabbitMQ 安装教程(CentOS版)_m0_67392811的博客-CSDN博客_centos rabbitmq安装

** RabbitMQ模型架构**

Broker:rabbitmq的服务节点

Queue:队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中。生产 者投递消息到队列,消费者从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的 消息会被平均分摊(轮询)给多个消费者进行消费,而不是每个消费者都收到所有的消息进行消费。(注 意:RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑 定多个队列,由多个消费者来订阅这些队列的方式。

Exchange:交换器。生产者将消息发送到Exchange,由交换器将消息路由到一个或多个队列中。如果 路由不到,或返回给生产者,或直接丢弃,或做其它处理。

RoutingKey:路由Key。生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定 这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。 在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消 息流向哪里。

Binding:通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ 就可以指定如何正确的路由到队列了。 交换器和队列实际上是多对多关系。就像关系数据库中的两张表。他们通过BindingKey做关联(多对多 关系表)。在投递消息时,可以通过Exchange和RoutingKey(对应BindingKey)就可以找到相对应的队 列。

信道:信道是建立在Connection 之上的虚拟连接。当应用程序与Rabbit Broker建立TCP连接的时候, 客户端紧接着可以创建一个AMQP 信道(Channel) ,每个信道都会被指派一个唯一的D。RabbitMQ 处 理的每条AMQP 指令都是通过信道完成的。信道就像电缆里的光纤束。一条电缆内含有许多光纤束,允许所有的连接通过多条光线束进行传输和接收。

RabbitMQ消息的顺序?

生产者消息的顺序

生产者发送消息的顺序通常不做要求,若要指定,可以加锁操作,而代价就是程序并发性下降。

队列中的消息顺序

RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,不同队列中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费者处理消息顺序

在默认情况下时轮序分发,但是这种分发方式对消费者来说不太友好。比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活。设置channel.basicQos(1);进行不公平分发,能者多劳 。

RabbitMQ如何保证消息是否成功发送无丢失?

使用应答机制和发布确认机制

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理,rabbitmq 可以把该消息删除。

应答分为自动应答和手动应答

自动应答 :消息发送后立即被认为已经传送成功。这种模式如果消息在接收到之前,消费者出现连接或者 channel 关闭,那么消息就会丢失。另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,但是消息积压会导致内存耗尽,最终 这些 线程被操作系统kill。所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答:消费者处理完业务逻辑,⼿动返回ack(通知)告诉队列处理完了,队列进⽽删除消息。手动应答相比自动应答更加可靠。

另外一个防止消息丢失重要机制,发布确认

发送方确认机制: 信道需要设置为 confirm 模式,则所有在信道上发布的消息都会分配一个唯一 ID。 一旦消息被投递到queue(可持久化的消息需要写入磁盘),信道会发送一个确认给生产者(包含消息唯一 ID)。 如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(未确认)消息给生产者。 所有被发送的消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做 任何保证,并且同一条消息不会既被 confirm又被nack 发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者, 生产者的回调方法会被触发。 ConfirmCallback接口:只确认是否正确到达 Exchange 中,成功到达则回调 ReturnCallback接口:消息失败返回时回调。

接收方确认机制: 消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号 后才从内存(或者磁盘,持久化消息)中移去消息。否则,消息被消费后会被立即删除。 消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息, RabbitMQ 才能安全地把消息从队列中删除。 RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该 消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很长。保 证数据的最终一致性; 如果消费者返回ack之前断开了链接,RabbitMQ 会重新分发给下一个订阅的消费者。(可能存在消息重复消 费的隐患,需要去重)

单个确认发布 这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。 这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。

**批量确认发布 **上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。

异步确认发布

其中异步确认的可靠性和效率远远高于单个确认发布和批量确认发布

若消息丢失了怎么办 ?

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样即使某个消费者偶尔死亡,也可以确保不会丢失任何消息

交换机的分类

Fanout exchange:扇出交换机。这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。

Direct exchange:直接交换机。交换机通过绑定不同的Routing key从而实现直接将某个消息发送到队列上。

Topics exchange:主题交换机。比扇出和直接交换机更加的灵活。通过匹配通配符的方式实现发送。*(星号)可以代替一个单词,#(井号)可以替代零个或多个单词。像Routing key的名称为yello.blue.red 可以写成的方式就有很多种了,比如 ***.. yello. *.red ..red *.blue.# **

死信队列和延迟队列

什么是死信队列?

producer 将消息投递到 broker 或者直接到queue 里,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信

死信来源

消息 TTL(一条消息或者该队列中的所有消息的最大存活时间) 过期

队列达到最大长度(队列满了,无法再添加数据到 mq 中)

消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

处理死信队列方法

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃

可以为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然 后为每个业务队列分配一个单独的路由Routing key,死信队列只不过是绑定在死信交换机上的队列,死信交换 机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、 Fanout、Topic】

什么是延迟队列?

延迟队列其实是死信队列的一种,为其中消息TTL过期,延迟队列里面存放的就是延迟队列消息,但是由于消费者长时间未作出响应,超出TTL时间,则转化成了死信队列。

RabbitMQ事务消息

通过对信道的设置实现

  1. channel.txSelect();通知服务器开启事务模式;服务端会返回Tx.Select-Ok

  2. channel.basicPublish;发送消息,可以是多条,可以是消费消息提交ack

  3. channel.txCommit()提交事务;

  4. channel.txRollback()回滚事务;

消费者使用事务:

  1. autoAck=false,手动提交ack,以事务提交或回滚为准;

  2. autoAck=true,不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了

如果其中任意一个环节出现问题,就会抛出IoException异常,用户可以拦截异常进行事务回滚或决定要不要重复消息。

事务消息会降低rabbitmq的性能

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_52875557/article/details/126283460
版权归原作者 今天你学习了ma 所有, 如有侵权,请联系我们删除。

“RabbitMQ”的评论:

还没有评论