RabbitMQ Streams是一种持久复制数据结构,可以完成与队列相同的任务:它们缓冲来自生产者的消息,这些消息由消费者读取。然而,流与队列的区别在于两个重要方面:消息的存储和消费方式。
Streams为仅追加的消息日志建模,这些消息可以重复读取,直到过期。流始终是持久的和复制的。这种流行为的更技术性的描述是“非破坏性消费者语义”。
要从RabbitMQ中的流中读取消息,一个或多个使用者订阅它,并根据需要多次读取相同的消息。
流中的数据可以通过RabbitMQ客户端库或通过专用的二进制协议插件和关联的客户端使用。强烈建议使用后一个选项,因为它提供对所有流特定功能的访问,并提供尽可能最好的吞吐量(性能)。
现在,您可能会问以下问题:
- 那么流会取代队列吗?
- 我应该放弃使用队列吗?
为了回答这些问题,引入流不是为了取代队列,而是为了补充队列。Streams为新的RabbitMQ用例开辟了许多机会,这些用例在使用Streams的用例中进行了描述。
以下信息详细说明流的使用以及流的管理和维护操作。
您还应该查看流插件信息,以了解有关使用二进制RabbitMQ stream协议的流的更多信息,以及功能矩阵的流核心和流插件比较页面。
一、使用流的用例
开发流最初是为了涵盖现有队列类型无法提供或具有缺点的4个消息传递用例:
1、大型扇形分叉
当想要将相同的消息传递给多个订阅者时,用户当前必须为每个消费者绑定一个专用队列。如果使用者数量很大,这可能会变得效率低下,特别是在需要持久性和/或复制时。流将允许任意数量的使用者以非破坏性的方式消费来自同一队列的相同消息,从而不需要绑定多个队列。流消费者还可以从副本中读取数据,从而允许读取负载在集群中分布。
2、重放(时间旅行)
由于所有当前的RabbitMQ队列类型都具有破坏性消费行为,即当消费者用完消息时,将从队列中删除消息,因此不可能重新读取已消费的消息。流将允许消费者在日志中的任何点连接并从那里读取。
3、吞吐量性能
没有任何持久队列类型能够提供可以与任何现有的基于日志的消息传递系统竞争的吞吐量。Streams的设计以性能为主要目标。
4、大量积压工作
大多数RabbitMQ队列被设计为收敛于空状态,并因此进行了优化,当给定队列上有数百万条消息时,性能可能会更差。流旨在以有效的方式存储大量数据,并将内存开销降至最低。
二、如何使用RabbitMQ Streams
可以指定可选队列和使用者参数的AMQP 0.9.1客户端库将能够将流用作常规AMQP 09.1队列。
就像队列一样,必须首先声明流。
1、声明RabbitMQ Stream
要声明流,请将x-queue-type队列参数设置为stream(默认值为classic)。此参数必须由客户端在声明时提供;不能使用策略设置或更改它。这是因为策略定义或适用的策略可以动态更改,但队列类型不能更改。必须在声明时指定。
下面的片段显示了如何使用AMQP 0.9.1 Java客户端创建流:
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
Collections.singletonMap("x-queue-type", "stream")
);
使用设置为stream的x-queue-type参数声明队列将在每个配置的RabbitMQ节点上创建一个具有副本的流。流是仲裁系统,因此强烈建议使用不均匀的集群大小。
流仍然是AMQP 0.9.1队列,因此它可以在创建后绑定到任何交换,就像任何其他RabbitMQ队列一样。
如果使用管理UI声明,则必须使用队列类型下拉菜单指定流类型。
流支持3个额外的队列参数,这些参数最好使用策略配置:
- x-max-length-bytes
设置流的最大大小(以字节为单位)。默认值:未设置。
- x-max-age
设置流的最长期限。默认值:未设置。
- x-stream-max-segment-size-bytes
单位:字节。流在磁盘上被划分为固定大小的段文件。 此设置控制它们的大小。 默认值:(500000000 字节)。
以下代码片段演示如何将流的最大大小设置为 20 GB,并使用 100 MB 的段文件:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
arguments.put("x-max-length-bytes", 20_000_000_000); // maximum stream size: 20 GB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000); // size of segment files: 100 MB
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
arguments
);
三、客户端操作
1、Consuming
由于流永远不会删除任何消息,因此任何消费者都可以开始读取/消费 从日志中的任何一点。这由 x-stream-offset consumer 参数控制。 如果未指定,消费者将从写入的下一个偏移量开始读取 添加到使用者启动后的日志中。支持以下值:
- first - 从日志中的第一条可用消息开始
- last - 从最后写入的消息“块”(块 是流中使用的储运单位,简单来说就是批次 由几到几千条消息组成的消息,具体取决于入口)
- next - 与不指定任何偏移量相同
- 偏移量 - 一个数值,指定要附加到日志的确切偏移量。 如果此偏移量不存在,它将分别钳制到日志的开始或结束。
- 时间戳 - 一个时间戳值,指定要附加到日志的时间点。 它将限制到最接近的偏移量,如果时间戳超出流的范围,它将分别限制日志的开始或结束。 在 AMQP 0.9.1 中,使用的时间戳是精度为 00 秒的 POSIX 时间,即自 00-00-1970 01:01:<> UTC 以来的秒数。 请注意,使用者可以接收在指定时间戳之前发布的消息。
- 间隔 - 一个字符串值,指定相对于当前时间附加日志的时间间隔。使用与 x-max-age 相同的规范
以下代码片段演示如何使用第一个偏移量规范:
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码片段演示如何指定要从中消费的特定偏移量:
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", 5000), // offset value
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
以下代码片段演示如何指定要从中使用的特定时间戳:
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-offset", timestamp), // timestamp offset
(consumerTag, message) -> {
// message processing
// ...
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
2、其他流操作
以下操作的使用方式与经典队列和仲裁队列类似 但有些具有一些特定于队列的行为。
- 声明
- 队列删除
- 发布者确认
- 消费(订阅):消费需要 QoS 要设置的预取。acks 作为一种信用机制来推进当前 消费者的偏移量。
- 为使用者设置 QoS 预取
- 消费者确认(牢记 QoS 预取限制)
- 取消消费者
四、流的单个活动消费者功能
流的单个活动使用者是 RabbitMQ 3.11 及更高版本中提供的一项功能。 它在流上提供独占消费和消费连续性。 当共享同一流和名称的多个使用者实例启用单个活动使用者时,这些实例中只有一个实例将同时处于活动状态,因此将接收消息。 其他实例将处于空闲状态。
单一活跃使用者功能提供 2 个好处:
- 消息按顺序处理:一次只有一个使用者。
- 保持消费连续性:如果活动消费者停止或崩溃,则该组中的消费者将接管。
五、超级流
超级流是一种通过将大型流划分为较小的流来横向扩展的方法。 它们与单个活动使用者集成,以保留分区内的消息顺序。 超级流从 RabbitMQ 3.11 开始可用。
超级流是由单个常规流组成的逻辑流。 这是一种使用 RabbitMQ 流横向扩展发布和使用的方法:将大型逻辑流划分为分区流,将存储和流量拆分到多个集群节点上。
超级流仍然是一个逻辑实体:由于客户端库的智能性,应用程序将其视为一个“大”流。 超级流的拓扑基于 AMQP 0.9.1 模型,即它们之间的交换、队列和绑定。
可以使用任何 AMQP 0.9.1 库或管理插件创建超级流的拓扑,它需要创建直接交换,即“分区”流,并将它们绑定在一起。 不过,使用 rabbitmq-streams add_super_stream 命令可能更容易。 以下是如何使用它来创建具有 3 个分区的发票超级流:
rabbitmq-streams add_super_stream invoices --partitions 3
使用 rabbitmq-streams add_super_stream --help 了解有关该命令的更多信息。
与单个流相比,超级流增加了复杂性,因此不应将其视为涉及流的所有用例的默认解决方案。 仅当您确定已达到单个流的限制时,才考虑使用超级流。
六、功能比较:常规队列与流
流不是传统意义上的真正队列,因此不是 与 AMQP 0.9.1 队列语义非常一致。其他队列类型的许多功能 不支持支持,并且永远不会由于队列类型的性质而支持。
可以使用常规队列的 AMQP 0.9.1 客户端库将能够使用流 只要它使用消费者确认。
由于流中许多功能是非破坏性的,因此它们永远不会被流支持 读取语义。
1、功能矩阵
特征
经典
流
非持久性队列
是的
不
排他性
是的
不
每条消息的持久性
每条消息
总是
成员资格变更
自动
手动
TTL的
是的
否(但请参阅保留期)
队列长度限制
是的
否(但请参阅保留期)
懒惰行为
是的
固有
消息优先级
是的
不
消费者至上
是的
不
死信交换
是的
不
遵守政策
是的
(请参阅保留期)
对内存警报做出反应
是的
否(使用最少的 RAM)
病毒邮件处理
不
不
全局 QoS 预取
是的
不
2、非持久性队列
根据其假定的用例,流始终是持久的, 它们不能像常规队列那样是非持久的。
3、排他性
根据其假定的用例,流始终是持久的,它们不能像常规队列那样是排他性的。 它们不应用作临时队列。
4、懒惰模式
写入消息后,流将所有数据直接存储在磁盘上 在读取之前,它不会使用任何内存。可以这么说,流本质上是懒惰的。
5、全球服务质量
流不支持全局 QoS 预取,其中通道设置单个 使用该通道的所有使用者的预取限制。如果尝试 从启用了全局 QoS 的通道中的流中使用 将返回通道错误。
使用每使用者 QoS 预取,这是几个常用客户端中的默认设置。
版权归原作者 Doker 技术人的数码品牌 所有, 如有侵权,请联系我们删除。