0、引言
先决条件
本教程假设 RabbitMQ 已安装并且正在
本地主机
的标准端口(
5672
)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。
获取帮助
如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。
在上一篇教程中我们改进了日志系统。我们没有使用只能进行虚拟广播的
fanout
交换机,而是使用了
direct
直连交换机,从而获得了选择性接收日志的可能性。
尽管使用了
direct
直连交换机改进了我们的系统,但它仍然有局限性 —— 它不能基于多个标准进行路由。
在我们的日志系统中,我们可能不仅希望订阅根据严重程度区分的日志,还希望订阅根据来源区分的日志。您可能从 syslog unix 工具中了解过这个概念,它根据严重程度(info/warn/crit…)和设备(auth/cron/kern…)路由日志。
这可以给我们很大的灵活性 —— 我们也许希望仅了解来自于 ‘corn’ 的关键错误,但同样也希望了解来自于 ‘kern’ 的所有日志。
为了在我们的日志系统中实现这一点,我们需要学习一种更复杂的
topic
主题交换机。
原文链接:https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html
1、主题交换机
发送给
topic
主题交换机的消息不能有任意的路由键 —— 它只能是一个由点分隔的单词列表。单词可以是任何东西,但通常它们指定与消息相关的一些特征。下面是一些有效的路由键示例:“
stock.usd.nyse
”、“
nyse.vmw
”、“
quick.orange.rabbit
”。路由键中可以有任意多的单词,最多不超过 255 字节。
绑定键也必须是同样的形式。
topic
主题交换机背后的逻辑与
direct
直连交换机类似 —— 使用特定
routing key
路由键发送的消息将被传递给使用匹配
binding key
绑定键绑定的所有队列。但是,绑定键有两种重要的特殊情况:
*
(星号)只能匹配一个单词。#
(井号)可以匹配零或者多个单词。
用一个例子来解释这一点最简单:
在这个例子当中,我们将会发送一些均用来描述动物的消息。我们将会使用包含三个单词(两个点)的
routing key
路由键发送这些消息。路由键中的第一个单词用于描述速度,第二个描述颜色以及第三个描述物种:“
<speed>.<colour>.<species>
”。
我们创建三个绑定:
Q1
使用 “
*.orange.*
” 绑定键绑定;
Q2
使用 “
*.*.rabbit
” 和 “
lazy.#
” 绑定。
这些绑定可以被总结为:
Q1
对所有的橙色动物感兴趣。Q2
想知道关于兔子以及懒惰动物的一切信息。
- 带有 “
quick.orange.rabbit
” 路由键的消息将被同时传递给两个队列。“lazy.orange.elephant
” 消息同样也会同时前往两个队列。- 另一方面,“
quick.orange.fox
” 只会前往Q1
,而 “lazy.brown.fox
” 只会前往Q2
。- “
lazy.pink.rabbit
” 只会被传递给Q2
一次,即便它与两个绑定匹配。- “
quick.brown.fox
” 不匹配任何绑定,因此将被丢弃。
如果我们违反约定,发送带有一个或者四个单词的消息(比如 “
orange
” 或者 “
quick.orange.new.rabbit
”),会发生什么呢?好吧,这些消息将不会匹配任何绑定并且丢失。
另一方面,即便 “
lazy.orange.new.rabbit
” 有四个单词,但仍然匹配最后一个绑定,所以将会被传递给
Q2
队列。
主题交换机
主题交换机十分强大并且能够模拟其他交换机。
当一个队列使用 “
#
”(井号)绑定键绑定时 —— 它将会忽略路由键接收所有消息 —— 就像
fanout
扇出交换机那样。
当绑定中不使用 “
*
”(星号)与 “
#
”(井号)特殊字符时,
topic
主题交换机将会表现得像是
direct
直连交换机。
2、将所有的东西放到一起
我们将在日志系统中使用
topic
主题交换机。我们将从一个工作假设开始:即日志的路由键将有两个单词:“
<facility>.<severity>
”。
代码几乎与上一篇教程中一致。
EmitLogTopic.cs
的代码:
usingSystem.Text;usingRabbitMQ.Client;var factory =newConnectionFactory{ HostName ="localhost"};usingvar connection = factory.CreateConnection();usingvar channel = connection.CreateModel();
channel.ExchangeDeclare(exchange:"topic_logs",type: ExchangeType.Topic);var routingKey =(args.Length >0)? args[0]:"anonymous.info";var message =(args.Length >1)?string.Join(" ", args.Skip(1).ToArray()):"Hello World!";var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange:"topic_logs",routingKey: routingKey,basicProperties:null,body: body);
Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");
ReceiveLogsTopic.cs
的代码:
usingSystem.Text;usingRabbitMQ.Client;usingRabbitMQ.Client.Events;var factory =newConnectionFactory{ HostName ="localhost"};usingvar connection = factory.CreateConnection();usingvar channel = connection.CreateModel();
channel.ExchangeDeclare(exchange:"topic_logs",type: ExchangeType.Topic);// declare a server-named queuevar queueName = channel.QueueDeclare().QueueName;if(args.Length <1){
Console.Error.WriteLine("Usage: {0} [binding_key...]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode =1;return;}foreach(var bindingKey in args){
channel.QueueBind(queue: queueName,exchange:"topic_logs",routingKey: bindingKey);}
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");var consumer =newEventingBasicConsumer(channel);
consumer.Received +=(model, ea)=>{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;
Console.WriteLine($" [x] Received '{routingKey}':'{message}'");};
channel.BasicConsume(queue: queueName,autoAck:true,consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
运行如下示例:
接收所有日志:
cd ReceiveLogsTopic
dotnet run "#"
接收来自 “
kern
” 的所有日志:
cd ReceiveLogsTopic
dotnet run "kern.*"
或者您只想知道 “
critical
” 日志:
cd ReceiveLogsTopic
dotnet run "*.critical"
您可以创建多个绑定:
cd ReceiveLogsTopic
dotnet run "kern.*""*.critical"
并发出带有 “
kern.critical
” 类型路由键的日志:
cd EmitLogTopic
dotnet run "kern.critical""A critical kernel error"
运行效果:
😀 玩儿的开心!注意,代码没有对路由键或者绑定键做任何假设,您可能希望试试使用两个以上的路由键参数。
(EmitLogTopic.cs 和 ReceiveLogsTopic.cs 的完整源码)
接下来,在教程六中了解如何将往返消息作为远程过程调用。
5、生产[非]适用性免责声明
请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。
在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。
版权归原作者 YMGogre 所有, 如有侵权,请联系我们删除。