一、什么是RabbitMQ
RabbitMQ是一个开源的消息中间件,它实现了AMQP标准,并且可以在分布式系统中存储、转发和接收消息,可以将消息从一个应用程序发送到另一个应用程序,即使这些应用程序不同时运行,也可以在消息队列中存储消息,确保消息的可靠传递。也就相当于快递,你发快递,你的朋友收快递,RabbitMQ就是快递公司。RabbitMQ可以用于解决各种问题,如解耦系统组件、异步处理任务、实现事件驱动架构等。
NuGet包安装:NuGet\Install-Package RabbitMQ.Client -Version 6.4.0
二、发送
CreateModel是在连接上创建一个信道,信道是RabbitMQ中用于发送和接收消息的主要路径,几乎所有的操作都是通过信道完成的,它是建立在已经建立的TCP连接之上的虚拟连接,信道的创建和销毁相对于TCP连接来说开销较小,这使得客户端可以创建多个信道来并发处理多个任务。
QueueDeclare声明一个消息队列,在 RabbitMQ 中存储消息时,首先就要创建队列。
BasicPublish用于将消息发送到队列中。
var factory = new ConnectionFactory
{
HostName = "localhost", // RabbitMQ 服务器的主机名或 IP 地址,我这里为本地
//Port = 5672, // RabbitMQ 服务器的端口号
//UserName = "guest", // 用于身份验证的用户名
//Password = "guest", // 用于身份验证的密码
//VirtualHost = "/"// 虚拟主机名称
};
using var connection = factory.CreateConnection();//建立与 RabbitMQ 服务器的连接
using var channel = connection.CreateModel();//在已经建立的连接上创建一个新的信道(channel)
// 声明一个队列
channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
durable: false,//true为开启队列持久化
exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
arguments: null//用于声明队列时的其他属性设置
);
const string message = "你好";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,//交换机名称。
routingKey: "NewRabbitMQ",//路由键。它决定了消息应该被发送到哪个队列。
basicProperties: null,//消息的属性,如消息的持久性、优先级、内容类型等。
body: body//消息的主体,通常是一个字节数组。
);
Console.WriteLine("OK");
三、接收
EventingBasicConsumer(T) 是 RabbitMQ 中的一个消费者类,用于接收来自 RabbitMQ 服务器中T队列的消息。
Received事件会在队列中有新消息到达时被触发。
//前面的和发送基本一致
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "NewRabbitMQ",
durable: false,//true为开启队列持久化
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息
//Received在队列中有新消息到达时被触发
consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
};
channel.BasicConsume(queue: "NewRabbitMQ",
autoAck: true,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
consumer: consumer);
Console.ReadLine();
四、消息确认
消息确认机制是为了确保消费者成功消费了队列中的消息,并避免因消费者处理失败而导致消息丢失或重复处理的问题。即就是消费者在对一个任务处理时,任务还没有处理完成,而消费者却意外挂了,消息确认就是处理方案。
在使用BasicAck时,消费者需要提供一个参数,即deliveryTag,这个参数用于标识要确认的消息。RabbitMQ服务器在接收到BasicAck请求后,会将该消息从队列中移除,以确保消息不会被重复处理。
consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);//ea.DeliveryTag队列编号
};
channel.BasicConsume(queue: "NewRabbitMQ",
autoAck: false,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
consumer: consumer);
Console.ReadLine();
注:如果忘记消息确认,会导致RabbitMQ服务器一直投送重复任务,导致内存占用严重。
五、消息持久化
消费者挂掉的解决办法有了,那如果RabbitMQ服务器整个都挂掉了呢,那会使我们原本不富裕的头顶更加贫瘠。为了避免这种事情的发生,需要使用消息持久化。
消息持久化是指将消息和队列都保存在磁盘上,以确保在RabbitMQ服务重启或其他异常情况下,消息不会丢失。首先,需要在创建队列时(生产者和消费者都要)将durable设置为true,在发送消息时,需要将消息的DeliveryMode属性设置为2。
channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
durable: true,//true为开启队列持久化
exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
arguments: null//用于声明队列时的其他属性设置
);
channel.CreateBasicProperties().DeliveryMode = 2;//设置消息持久化
此外需要注意的是: 队列和消息都被设置为持久化,也不能百分之一百保证消息不丢失(剩下的交给天意吧),因为RabbitMQ在接收到消息后,不会立即将其保存到磁盘上,而是会先将其存储在内存中,然后再异步地将其写入磁盘,也因为消息会写入到磁盘中,所以会对性能有一定的影响。
六、限流
如果有多个消费者,就会出现一些消费者繁忙,一些消费者清闲,为了避免某些消费者压力过大,会需要使用流量控制的限制,通过BasicQos可以控制消费者从RabbitMQ队列中拉取的消息的数量和大小。
prefetchSize整数值,可以接收但尚未确认的消息的最大字节数,达到限制时,RabbitMQ将停止向该消费者发送更多消息。
prefetchCount整数值,可以接收但尚未确认的消息的最大数量。
global布尔值,是否应用于整个连接中的所有信道,true连接所有信道,false连接当前信道。
//在消费者中设置
channel.BasicQos(
prefetchSize: 0,
prefetchCount: 1,
global: false
);
var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息
注意:限流只有在autoAck为false的情况下才有用(即手动消息确认模式下),自动消息确认下不起作用。
七、交换机
RabbitMQ中生产者不会直接将消息发送到队列,而是将消息发送到交换机,交换机再将消息路由到一个或多个队列中。
直连交换机
将消息路由到与其绑定的队列中,要求消息的路由键(routingKey)与队列的绑定键完全匹配。
发送
声明交换机使用ExchangeDeclare,包含五个参数:
exchange交换机的名称。
type交换机的类型。
durable交换机是否应该在RabbitMQ服务器重启后依然存在。
autoDelete最后一个队列与交换机解绑后,是否自动删除交换机。
arguments传递特定于交换机的参数。
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
// 声明一个直连交换机
channel.ExchangeDeclare(
exchange: "RabbitMQ_direct",//交换机名称
type: "direct",//交换机类型
durable:true
);
// 声明一个队列
channel.QueueDeclare(
queue: "NewRabbitMQ",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// 绑定队列到直连交换机,指定路由键
channel.QueueBind(
queue: "NewRabbitMQ",//要绑定的队列名称。
exchange: "RabbitMQ_direct",//要绑定的交换机名称。
routingKey: "direct.key"//是否将消息路由到这个队列
);
// 发送消息到直连交换机
string message = "你好";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "RabbitMQ_direct",//交换机名称
routingKey: "direct.key",
basicProperties: null,
body: body
);
Console.WriteLine("OK" + message);
}
接收
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
//接收消息
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
};
channel.BasicConsume(
queue:"NewRabbitMQ",
autoAck: true,
consumer: consumer
);
Console.ReadLine();
}
主题交换机
根据消息的路由键和队列的绑定模式进行匹配,将消息路由到符合模式的队列中。主题交换机支持通配符匹配,如“”表示匹配一个单词,“#”表示匹配多个单词,例如:".news" 会匹配所有以 "news" 结尾的路由键。
发送
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
// 声明一个直连交换机
channel.ExchangeDeclare(
exchange: "RabbitMQ_topic",//交换机名称
type: "topic"//交换机类型
);
// 声明一个队列
channel.QueueDeclare(
queue: "NewRabbitMQ",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// 绑定队列到直连交换机,指定路由键
channel.QueueBind(
queue: "NewRabbitMQ",//要绑定的队列名称。
exchange: "RabbitMQ_topic",//要绑定的交换机名称。
routingKey: "user.*.info"//是否将消息路由到这个队列,这里表示匹配所有以 "user." 开头,并以 "info" 结尾的路由键
);
// 发送消息到直连交换机
string message = "你好";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "RabbitMQ_topic",//交换机名称
routingKey: "user.abc.info",
basicProperties: null,
body: body
);
Console.WriteLine("OK" + message);
}
接收
与上面一致
扇形交换机
将消息路由到所有与其绑定的队列中,不关心消息的路由键和队列的绑定键。
发送
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
// 声明一个直连交换机
channel.ExchangeDeclare(
exchange: "RabbitMQ_fanout",//交换机名称
type: "fanout"//交换机类型
);
// 声明一个队列
channel.QueueDeclare(
queue: "NewRabbitMQ",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// 绑定队列到直连交换机,指定路由键
channel.QueueBind(
queue: "NewRabbitMQ",//要绑定的队列名称。
exchange: "RabbitMQ_fanout",//要绑定的交换机名称。
routingKey: ""//扇形交换机不需要路由键,因为所有消息都会被路由到所有绑定的队列
);
// 发送消息到直连交换机
string message = "你好";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "RabbitMQ_fanout",//交换机名称
routingKey: "",
basicProperties: null,
body: body
);
Console.WriteLine("OK" + message);
}
接收
与上面一致
版权归原作者 吾乃猪儿虫 所有, 如有侵权,请联系我们删除。