0


.Net中RabbitMQ的使用详情

一、什么是RabbitMQ

RabbitMQ是一个开源的消息中间件,它实现了AMQP标准,并且可以在分布式系统中存储、转发和接收消息,可以将消息从一个应用程序发送到另一个应用程序,即使这些应用程序不同时运行,也可以在消息队列中存储消息,确保消息的可靠传递。也就相当于快递,你发快递,你的朋友收快递,RabbitMQ就是快递公司。RabbitMQ可以用于解决各种问题,如解耦系统组件、异步处理任务、实现事件驱动架构等。

NuGet包安装:NuGet\Install-Package RabbitMQ.Client -Version 6.4.0

二、发送

CreateModel是在连接上创建一个信道,信道是RabbitMQ中用于发送和接收消息的主要路径,几乎所有的操作都是通过信道完成的,它是建立在已经建立的TCP连接之上的虚拟连接,信道的创建和销毁相对于TCP连接来说开销较小,这使得客户端可以创建多个信道来并发处理多个任务。

QueueDeclare声明一个消息队列,在 RabbitMQ 中存储消息时,首先就要创建队列。

BasicPublish用于将消息发送到队列中。

var factory = new ConnectionFactory
{
    HostName = "localhost", // RabbitMQ 服务器的主机名或 IP 地址,我这里为本地
    //Port = 5672, // RabbitMQ 服务器的端口号  
    //UserName = "guest", // 用于身份验证的用户名  
    //Password = "guest", // 用于身份验证的密码  
    //VirtualHost = "/"// 虚拟主机名称
};
using var connection = factory.CreateConnection();//建立与 RabbitMQ 服务器的连接
using var channel = connection.CreateModel();//在已经建立的连接上创建一个新的信道(channel)
// 声明一个队列
channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
                     durable: false,//true为开启队列持久化
                     exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
                     autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
                     arguments: null//用于声明队列时的其他属性设置
                     );
const string message = "你好";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,//交换机名称。
                     routingKey: "NewRabbitMQ",//路由键。它决定了消息应该被发送到哪个队列。
                     basicProperties: null,//消息的属性,如消息的持久性、优先级、内容类型等。
                     body: body//消息的主体,通常是一个字节数组。
                     );
Console.WriteLine("OK");

三、接收

EventingBasicConsumer(T) 是 RabbitMQ 中的一个消费者类,用于接收来自 RabbitMQ 服务器中T队列的消息。

Received事件会在队列中有新消息到达时被触发。

//前面的和发送基本一致
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "NewRabbitMQ",
                     durable: false,//true为开启队列持久化
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息
//Received在队列中有新消息到达时被触发
consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
};
channel.BasicConsume(queue: "NewRabbitMQ",
                     autoAck: true,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
                     consumer: consumer);
Console.ReadLine();

四、消息确认

消息确认机制是为了确保消费者成功消费了队列中的消息,并避免因消费者处理失败而导致消息丢失或重复处理的问题。即就是消费者在对一个任务处理时,任务还没有处理完成,而消费者却意外挂了,消息确认就是处理方案。

在使用BasicAck时,消费者需要提供一个参数,即deliveryTag,这个参数用于标识要确认的消息。RabbitMQ服务器在接收到BasicAck请求后,会将该消息从队列中移除,以确保消息不会被重复处理。

consumer.Received += (model, ea) =>//第一个参数通常为创建的EventingBasicConsumer实例,第二个参数包含了与接收到的消息相关的信息
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
    channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);//ea.DeliveryTag队列编号
};
channel.BasicConsume(queue: "NewRabbitMQ",
                     autoAck: false,//是否自动确认消息。如果设置为 true,则每当消息被消费者接收时,RabbitMQ 会自动认为该消息已被成功处理,并将其从队列中移除。如果设置为 false,则消费者需要显式地发送一个确认(通过 BasicAck 方法)来告诉 RabbitMQ 该消息已被成功处理。
                     consumer: consumer);
Console.ReadLine();

注:如果忘记消息确认,会导致RabbitMQ服务器一直投送重复任务,导致内存占用严重。

五、消息持久化

消费者挂掉的解决办法有了,那如果RabbitMQ服务器整个都挂掉了呢,那会使我们原本不富裕的头顶更加贫瘠。为了避免这种事情的发生,需要使用消息持久化。

消息持久化是指将消息和队列都保存在磁盘上,以确保在RabbitMQ服务重启或其他异常情况下,消息不会丢失。首先,需要在创建队列时(生产者和消费者都要)将durable设置为true,在发送消息时,需要将消息的DeliveryMode属性设置为2。

channel.QueueDeclare(queue: "NewRabbitMQ",//队列的名称。如果此参数为空字符串,服务器将生成一个唯一的队列名称
                     durable: true,//true为开启队列持久化
                     exclusive: false,//队列是否只能由创建它的连接使用。当连接关闭时,队列将被自动删除。
                     autoDelete: false,//当队列中的所有消息都被消费者消费后,是否应该自动删除队列。
                     arguments: null//用于声明队列时的其他属性设置
                     );
channel.CreateBasicProperties().DeliveryMode = 2;//设置消息持久化

此外需要注意的是: 队列和消息都被设置为持久化,也不能百分之一百保证消息不丢失(剩下的交给天意吧),因为RabbitMQ在接收到消息后,不会立即将其保存到磁盘上,而是会先将其存储在内存中,然后再异步地将其写入磁盘,也因为消息会写入到磁盘中,所以会对性能有一定的影响。

六、限流

如果有多个消费者,就会出现一些消费者繁忙,一些消费者清闲,为了避免某些消费者压力过大,会需要使用流量控制的限制,通过BasicQos可以控制消费者从RabbitMQ队列中拉取的消息的数量和大小。

prefetchSize整数值,可以接收但尚未确认的消息的最大字节数,达到限制时,RabbitMQ将停止向该消费者发送更多消息。

prefetchCount整数值,可以接收但尚未确认的消息的最大数量。

global布尔值,是否应用于整个连接中的所有信道,true连接所有信道,false连接当前信道。

//在消费者中设置
channel.BasicQos(
    prefetchSize: 0,
    prefetchCount: 1,
    global: false
    );

var consumer = new EventingBasicConsumer(channel);//接收来自 RabbitMQ 服务器的消息

注意:限流只有在autoAck为false的情况下才有用(即手动消息确认模式下),自动消息确认下不起作用。

七、交换机

RabbitMQ中生产者不会直接将消息发送到队列,而是将消息发送到交换机,交换机再将消息路由到一个或多个队列中。

直连交换机

将消息路由到与其绑定的队列中,要求消息的路由键(routingKey)与队列的绑定键完全匹配。

发送

声明交换机使用ExchangeDeclare,包含五个参数:

exchange交换机的名称。

type交换机的类型。

durable交换机是否应该在RabbitMQ服务器重启后依然存在。

autoDelete最后一个队列与交换机解绑后,是否自动删除交换机。

arguments传递特定于交换机的参数。

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
    // 声明一个直连交换机  
    channel.ExchangeDeclare(
        exchange: "RabbitMQ_direct",//交换机名称
        type: "direct",//交换机类型
        durable:true
        );

    // 声明一个队列  
    channel.QueueDeclare(
        queue: "NewRabbitMQ",
        durable: false,
        exclusive: false,
        autoDelete: false,
        arguments: null
        );

    // 绑定队列到直连交换机,指定路由键  
    channel.QueueBind(
        queue: "NewRabbitMQ",//要绑定的队列名称。
        exchange: "RabbitMQ_direct",//要绑定的交换机名称。
        routingKey: "direct.key"//是否将消息路由到这个队列
        );

    // 发送消息到直连交换机  
    string message = "你好";
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(
        exchange: "RabbitMQ_direct",//交换机名称
        routingKey: "direct.key",
        basicProperties: null,
        body: body
        );
    Console.WriteLine("OK" + message);
}

接收

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
    //接收消息  
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(message);
    };
    channel.BasicConsume(
        queue:"NewRabbitMQ",
        autoAck: true,
        consumer: consumer
        );

    Console.ReadLine();
}

主题交换机

根据消息的路由键和队列的绑定模式进行匹配,将消息路由到符合模式的队列中。主题交换机支持通配符匹配,如“”表示匹配一个单词,“#”表示匹配多个单词,例如:".news" 会匹配所有以 "news" 结尾的路由键。

发送

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
    // 声明一个直连交换机  
    channel.ExchangeDeclare(
        exchange: "RabbitMQ_topic",//交换机名称
        type: "topic"//交换机类型
        );

    // 声明一个队列  
    channel.QueueDeclare(
        queue: "NewRabbitMQ",
        durable: false,
        exclusive: false,
        autoDelete: false,
        arguments: null
        );

    // 绑定队列到直连交换机,指定路由键  
    channel.QueueBind(
        queue: "NewRabbitMQ",//要绑定的队列名称。
        exchange: "RabbitMQ_topic",//要绑定的交换机名称。
        routingKey: "user.*.info"//是否将消息路由到这个队列,这里表示匹配所有以 "user." 开头,并以 "info" 结尾的路由键
        );

    // 发送消息到直连交换机  
    string message = "你好";
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(
        exchange: "RabbitMQ_topic",//交换机名称
        routingKey: "user.abc.info",
        basicProperties: null,
        body: body
        );
    Console.WriteLine("OK" + message);
}

接收

与上面一致

扇形交换机

将消息路由到所有与其绑定的队列中,不关心消息的路由键和队列的绑定键。

发送

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using (var channel = connection.CreateModel())
{
    // 声明一个直连交换机  
    channel.ExchangeDeclare(
        exchange: "RabbitMQ_fanout",//交换机名称
        type: "fanout"//交换机类型
        );

    // 声明一个队列  
    channel.QueueDeclare(
        queue: "NewRabbitMQ",
        durable: false,
        exclusive: false,
        autoDelete: false,
        arguments: null
        );

    // 绑定队列到直连交换机,指定路由键  
    channel.QueueBind(
        queue: "NewRabbitMQ",//要绑定的队列名称。
        exchange: "RabbitMQ_fanout",//要绑定的交换机名称。
        routingKey: ""//扇形交换机不需要路由键,因为所有消息都会被路由到所有绑定的队列
        );

    // 发送消息到直连交换机  
    string message = "你好";
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(
        exchange: "RabbitMQ_fanout",//交换机名称
        routingKey: "",
        basicProperties: null,
        body: body
        );
    Console.WriteLine("OK" + message);
}

接收

与上面一致

标签: rabbitmq c# asp.net

本文转载自: https://blog.csdn.net/qq_58159494/article/details/136263157
版权归原作者 吾乃猪儿虫 所有, 如有侵权,请联系我们删除。

“.Net中RabbitMQ的使用详情”的评论:

还没有评论