1.消息队列概述
消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流,并基于 数据通信 来进行分布式系统的集成。通过提供 消息传递 和 消息排队 模型,它可以在 分布式环境 下提供 应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步 等等功能,其作为 分布式系统架构 中的一个重要组件,有着举足轻重的地位。
2.消息队列的特点及应用场景
消息发送者 可以发送一个消息而无须等待响应。消息发送者 将消息发送到一条 虚拟的通道(主题 或 队列)上,消息接收者 则 订阅 或是 监听 该通道。一条信息可能最终转发给 一个或多个 消息接收者,这些接收者都无需对 消息发送者 做出 同步回应。整个过程都是 异步的。
当你需要使用 消息队列 时,首先需要考虑它的必要性。可以使用消息队列的场景有很多,最常用的几种,是做 应用程序松耦合、异步处理模式、发布与订阅、最终一致性、错峰流控 和 日志缓冲 等。反之,如果需要 强一致性,关注业务逻辑的处理结果,则使用
RPC
显得更为合适
3.RabbitMQ
3.1 初始RabbitMQ
是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang(高并发语言)语言来编写的,并且RabbitMQ是基于AMQP协议的。
3.2 Docker安装RabbitMQ
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=123-e RABBITMQ_DEFAULT_PASS=123-p 15672:15672 -p 5672:5672 rabbitmq:3-management
浏览器地址栏输入RabbitMQ服务器ip,输入登录账号123密码123,进入RabbitMQ。
3.3 RabbitMQ中的六大队列模式
//连接RabbitMQ
public class RabbitMQConnection
{
public static void SendMessage()
{
// 创建工厂对象
var connectionFactory = new ConnectionFactory()
{
HostName = "192.168.157.157",
Port = 5672,
UserName = "123",
Password = "123",
VirtualHost = "/"
};
// 通过工厂对象创建连接对象
var connection = connectionFactory.CreateConnection();
// 通过连接对象获取Channel对象
var channel = connection.CreateModel();
}
}
3.3.1 简单队列模式: 最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式。
//生产者
public static void SendMessage()
{
string queueName = "normal";
using (var connection = RabbitMQHelper.GetConnection())
{
// 创建信道
using(var channel = connection.CreateModel())
{
// 创建队列
channel.QueueDeclare(queue:queueName,durable: false, false, false, null);
// 没有绑定交换机,怎么找到路由队列的呢?
while (true)
{
string message = "Hello RabbitMQ Message";
var body = Encoding.UTF8.GetBytes(message);
// 发送消息到rabbitmq,使用rabbitmq中默认提供交换机路由,默认的路由Key和队列名称完全一致
channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
Thread.Sleep(1000);
Console.WriteLine("Send Normal message");
}
}
}
}
//消费者
public class Receive
{
public static void ReceiveMessage()
{
// 消费者消费是队列中消息
string queueName = "normal";
var connection = RabbitMQHelper.GetConnection("192.168.3.215", 5672);
{
var channel = connection.CreateModel();
{
// 如果你先启动是消费端就会异常
channel.QueueDeclare(queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received +=(model, ea) => {
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" Normal Received => {0}", message);
};
channel.BasicConsume(queueName,true, consumer);
}
}
}
}
3.3.2 工作队列:一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式。
//生产者
public class WorkerSend
{
public static void SendMessage()
{
string queueName = "Worker_Queue";
using (var connection = RabbitMQHelper.GetConnection())
{
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, false, false, false, null);
for (int i = 0; i < 30; i++)
{
string message = $"RabbitMQ Worker {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queueName, null, body);
Console.WriteLine("send Task {0} message",i + 1);
}
}
}
}
}
//消费者
public class WorkerReceive
{
public static void ReceiveMessage()
{
string queueName = "Worker_Queue";
var connection = RabbitMQHelper.GetConnection();
{
var channel = connection.CreateModel();
{
channel.QueueDeclare(queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //能者多劳
consumer.Received +=(model, ea) => {
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" Worker Queue Received => {0}", message);
};
channel.BasicConsume(queueName,true, consumer);
}
}
}
}
3.3.3 发布订阅:一个消息生产者,一个交换器,多个消息队列,多个消费者。称为发布/订阅模式,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
//生产者
public class FanoutSend
{
public static void SendMessage()
{
using (var connection = RabbitMQHelper.GetClusterConnection())
{
using(var channel = connection.CreateModel())
{
// 声明交换机对象
channel.ExchangeDeclare("fanout_exchange", "fanout" ,true);
// 创建队列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, true, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, true, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, true, false, false, null);
// 绑定到交互机
// fanout_exchange 绑定了 3个队列
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Fanout {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("fanout_exchange", "", properties, body);
Console.WriteLine("Send Fanout {0} message",i + 1);
}
}
}
}
}
//消费者
public class FanoutConsumer
{
public static void ConsumerMessage()
{
var connection = RabbitMQHelper.GetClusterConnection();
{
var channel = connection.CreateModel();
{
//申明exchange
channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
// 创建队列
string queueName1 = "fanout_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "fanout_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "fanout_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 绑定到交互机
channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
Console.WriteLine("[*] Waitting for fanout logs.");
//申明consumer
var consumer = new EventingBasicConsumer(channel);
//绑定消息接收后的事件委托
consumer.Received += (model, ea) => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine("[x] {0}", message);
};
channel.BasicConsume(queue: queueName3, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
3.3.4 路由模式:在发布/订阅模式的基础上,有选择的接收消息,也就是通过 routing 路由进行匹配条件是否满足接收消息。
//生产者
public class DirectSend
{
public static void SendMessage()
{
using (var connection = RabbitMQHelper.GetClusterConnection())
{
using(var channel = connection.CreateModel())
{
// 声明Direct交换机
channel.ExchangeDeclare("direct_exchange", "direct");
// 创建队列
string queueName1 = "direct_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "direct_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "direct_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 绑定到交互机 指定routingKey
channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Direct {i + 1} Message =>green";
var body = Encoding.UTF8.GetBytes(message);
// 发送消息的时候需要指定routingKey发送
channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);
Console.WriteLine("Send Direct {0} message",i + 1);
}
}
}
}
}
//消费者
public class DirectConsumer
{
public static void ConsumerMessage()
{
var connection = RabbitMQHelper.GetConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
var queueName = "direct_queue3";
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "red");
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "yellow");
channel.QueueBind(queue: queueName,
exchange: "direct_exchange",
routingKey: "green");
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
3.3.5 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比第四类更灵活。topics 主题模式跟 routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由键 routingKey,类似于SQL中 = 和 like 的关系。
//生产者
public class TopicProvider
{
public static void SendMessage()
{
using (var connection = RabbitMQHelper.GetConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("topic_exchange", "topic");
// 创建队列
string queueName1 = "topic_queue1";
channel.QueueDeclare(queueName1, false, false, false, null);
string queueName2 = "topic_queue2";
channel.QueueDeclare(queueName2, false, false, false, null);
string queueName3 = "topic_queue3";
channel.QueueDeclare(queueName3, false, false, false, null);
// 绑定到交互机
channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.#");
channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");
channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Topic {i + 1} Delete Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("topic_exchange", "user.data.abc.dd.d.d", null, body);
Console.WriteLine("Send Topic {0} message", i + 1);
}
}
}
}
}
//消费者
public class TopicConsumer
{
public static void ConsumerMessage()
{
var connection = RabbitMQHelper.GetConnection();
var channel = connection.CreateModel();
var queueName = "topic_queue1";
channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
channel.QueueDeclare(queueName, false, false, false, null);
// 有个bug
channel.QueueBind(queue: queueName,
exchange: "topic_exchange",
routingKey: "user.data.insert");
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
3.3.6 RPC模式 :与上面其他5种所不同之处,类模式是拥有请求/回复的。也就是有响应的,上面5种都没有。(消息队列事务及其ack手动/自动确认机制下文继续讲解,此处不做过多赘述)
3.4 RabbitMQ-消息确认模式
3.4.1 消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是指生产者发送消息后,等待消费者确认消息已经被正确接收和处理的一种机制。消息确认机制的主要目的是确保消息的可靠传递和处理,以避免消息丢失或重复处理的情况发生。
为什么需要消息确认机制呢?在分布式系统中,消息的发送和接收是异步的过程,可能会存在以下情况:
1.消息丢失:在消息发送过程中,可能由于网络故障、硬件故障或其他原因导致消息丢失。如果没有消息确认机制,生产者无法得知消息是否成功传递给消费者,从而无法保证消息的可靠性。
2.消息重复:在消息发送过程中,可能由于网络超时、消费者故障或其他原因导致消息重复发送。如果没有消息确认机制,消费者可能会多次处理同一条消息,导致重复操作和数据不一致问题。
3.4.2 生产者消息确认模式
发布确认是指生产者发送消息后,等待RabbitMQ服务器返回确认消息的过程。生产者可以通过调用channel.confirmSelect()方法启用发布确认模式,然后使用channel.waitForConfirms()方法等待服务器返回确认消息。如果服务器成功接收到消息并进行处理,会返回一个确认消息给生产者。
public class ConfirmDemo
{
public static void ConfirmModel()
{
var conn = RabbitMQHelper.GetConnection();
{
var channel = conn.CreateModel();
{
channel.ExchangeDeclare("confirm-exchange", ExchangeType.Direct, true);
channel.QueueDeclare("confirm-queue", false, false, false, null);
// channel.QueueBind("confirm-queue", "confirm-exchange", "", null);
var properties = channel.CreateBasicProperties();
//properties.DeliveryMode = 2;
// properties.Persistent = true;
byte[] message = Encoding.UTF8.GetBytes("Test");
channel.ConfirmSelect(); // 开启消息确认模式
channel.BasicPublish("confirm-exchange", ExchangeType.Fanout, properties, message);
if (!channel.WaitForConfirms())
{
Console.WriteLine("消息发送失败了。");
}
Console.WriteLine("消息发送成功!");
channel.Close();
}
}
}
}
3.4.3 生产者事务模式
事务模式类似数据库中的事务,但是这里的事务是基于信道的,在信道内提交事务,此时才会把消息传送给MQ,如果因为业务上某个消息生产失败,那么可以使用信道的事务的回滚操作,撤回上次提交之前生产的消息。
public class Transaction
{
/// <summary>
/// 使用事务方式确保数据正确到达消息服务端
/// </summary>
public static void TransactionMode()
{
string exchange = "tx-exchange";
string queue = "tx-queue";
string routeKey = "direct_key";
using (IConnection conn = RabbitMQHelper.GetConnection())
{
using (var channel = conn.CreateModel())
{
try
{
channel.TxSelect(); //用于将当前channel设置成transaction事务模式
channel.ExchangeDeclare(exchange, ExchangeType.Direct);
channel.QueueDeclare(queue, false, false, false, null);
channel.QueueBind(queue, exchange, routeKey, null);
var properties = channel.CreateBasicProperties();
// properties.Persistent = true;
// properties.DeliveryMode = 2;
Console.Write("输入发送的内容:");
var msg = Console.ReadLine();
byte[] message = Encoding.UTF8.GetBytes("发送消息:" + msg);
channel.BasicPublish(exchange, routeKey, properties, message);
// 故意抛异常
throw new Exception("出现异常");
channel.TxCommit();//txCommit用于提交事务
}
catch (Exception ex)
{
if (channel.IsOpen)
{
Console.WriteLine("触发事务回滚");
channel.TxRollback();
}
}
}
}
}
}
3.4.5 消费者应答-自动应答
自动应答模式下,MQ发送给消费者,然后就会从队列中删除消息。自动应答模式只会保证MQ到消费者,却无法保证消费者消费异常的情况。
//消费者
public class Receive
{
public static void ReceiveMessage()
{
// 消费者消费是队列中消息
string queueName = "normal";
var connection = RabbitMQHelper.GetConnection("192.168.3.215", 5672);
{
var channel = connection.CreateModel();
{
// 如果你先启动是消费端就会异常
channel.QueueDeclare(queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received +=(model, ea) => {
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" Normal Received => {0}", message);
};
channel.BasicConsume(queueName,true, consumer);
}
}
}
}
在这个例子中,我们创建了一个RabbitMQ消费者。当消费者接收到消息时,会调用
Received
事件处理器。在自动应答模式下(
channel.BasicConsume
的第二个参数设为
true
),不需要显式调用
channel.BasicAck()
,RabbitMQ会在消息传递到消费者后立即确认它。
3.4.6 消费者应答-手动应答
手动应答模式下,MQ发送消息给消费者,然后此消息还会在队列中存储,只有收到消费者发送的应答结果,然后才会从队列中删除消息。这种方式能够保证消息在消费者正确的消费,如果消费者消费出现异常,可以不进行应答,然后MQ会将消息发送给其他的消费者。
注意:如果一个队列只有一个消费者,这个消费者没有进行手动的应答,那么可能会造成死循环导致内存溢出。
//消费者
public class Receive
{
public static void ReceiveMessage()
{
// 消费者消费是队列中消息
string queueName = "normal";
var connection = RabbitMQHelper.GetConnection("192.168.3.215", 5672);
{
var channel = connection.CreateModel();
{
// 如果你先启动是消费端就会异常
channel.QueueDeclare(queueName, false, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received +=(model, ea) => {
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" Normal Received => {0}", message);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queueName,false, consumer);
}
}
}
}
在手动应答模式下(
channel.BasicConsume
的第二个参数设为
false
),需要显式调用
channel.BasicAck()
,RabbitMQ会在消息传递到消费者后立即确认它。然后删除队列中的数据。确保数据已成功发送至消费者。
4. Asp.Net Core中使用RabbitMQ(以上代码只是各队列的使用)
4.1 引入nuget RabbitMQ.Client 程序包。
4.2 IServiceCollection扩展方法
public static class RabbitMQ
{
/// <summary>
/// 单点连接
/// </summary>
/// <param name="services"></param>
/// <param name="connectionFactory"></param>
/// <returns></returns>
public static IServiceCollection AddRabbitMQ(this IServiceCollection services, ConnectionFactory connectionFactory)
{
return services.AddSingleton<IConnection>(provider => { return connectionFactory.CreateConnection(); });
}
/// <summary>
/// 集群连接
/// </summary>
/// <param name="services"></param>
/// <param name="connectionFactory"></param>
/// <returns></returns>
public static IServiceCollection AddRabbitMQ(this IServiceCollection services, Func<IConnection> func)
{
return services.AddSingleton<IConnection>(provider => { return func.Invoke(); });
}
}
4.3 Program注册RabbitMQ
//单节点注册RabbitMQ
builder.Services.AddRabbitMQ(
new ConnectionFactory()
{
HostName = "192.168.157.157", //ip
Port = 5672, // 端口
UserName = "123", // 账户
Password = "123", // 密码
VirtualHost = "/" // 虚拟主机
});
//集群注册RabbitMQ
builder.Services.AddRabbitMQ(
() =>
{
var factory = new ConnectionFactory
{
UserName = "123", // 账户
Password = "123", // 密码
VirtualHost = "/" // 虚拟主机
};
// 创建集群节点AmqpTcpEndpoint对象类型集合
List<AmqpTcpEndpoint> list = new List<AmqpTcpEndpoint>
{
new AmqpTcpEndpoint() { HostName = "192.168.157.157", Port = 5672 },
new AmqpTcpEndpoint() { HostName = "192.168.157.158", Port = 5672 },
new AmqpTcpEndpoint() { HostName = "192.168.157.159", Port = 5672 }
};
return factory.CreateConnection(list);
});
4.4 ApiController(可以结合以上的队列代码使用)
[Route("api/RabbitMQTest")]
[ApiController]
public class RabbitMQTestController : ControllerBase
{
private readonly ILogger<RabbitMQTestController> _logger;
private readonly IConnection _collection;
private IModel? _model;
/// <summary>
/// 构造函数注入
/// </summary>
/// <param name="collection"></param>
/// <param name="logger"></param>
public RabbitMQTestController(IConnection collection, ILogger<RabbitMQTestController> logger)
{
_collection = collection;
_logger = logger;
}
[HttpGet]
[Route("SendMessage")]
public async Task<string> SendMessage()
{
return await Task.Run(() =>
{
string queueName = "normal";
using (_model = _collection.CreateModel())
{
_model.QueueDeclare(queue: queueName, durable: false, false, false, null);
string message = "Hello RabbitMQ Message";
var body = Encoding.UTF8.GetBytes(message);
_model.BasicPublish(exchange: "", routingKey: queueName, null, body);
return "Send Normal message";
}
});
}
}
总结:
RabbitMQ是一个开源的消息代理和队列服务器,用于在分布式系统中传递消息。它支持多种消息协议,提供可靠的消息传递、灵活的路由、集群和高可用性功能。
版权归原作者 精神小伙就是猛 所有, 如有侵权,请联系我们删除。