0


如何在Window系统中安装RabbitMQ以及在.NET平台上实现收发消息功能

以下是接收客户端代码:
#region RabbitMQ接收客户端
private ConnectionFactory factory;
private IConnection connection;
private IModel channel;
private EventingBasicConsumer consumer;
///
/// 开始创建连接对象
///
public void StartReceiving(string ListenIp,string queueName, string QueueUserName,string QueueUserPassword, int Port = 5672)
{
try
{
//创建RabbitMQ服务器连接对象
factory = new ConnectionFactory() { HostName = ListenIp, Port = 5672, UserName = QueueUserName, Password = QueueUserPassword };
//生成RabbitMQ服务器连接对象
connection = factory.CreateConnection();
//创建RabbitMQ通道对象
channel = connection.CreateModel();
//声明交换机
channel.ExchangeDeclare(“myExchange”, type: “topic”,durable:true);
//声明队列
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//绑定队列和交换机
channel.QueueBind(queue: “myqueue”, exchange: “myExchange”, routingKey: “myRoutingKey”);

            //创建接收队列对象
            consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                //数据处理
                bool re = ProcessMessage(message);
                if (re)
                {
                    // 处理完消息后,手动确认消息已被消费
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }
            };
            //订阅消息队列
            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

            // 创建一个新线程来保持接收消息的循环
            Thread receiveThread = new Thread(ReceiveMessages);
            receiveThread.Start(queueName);
        }
        catch (Exception ex)
        {
            throw new Exception("创建连接异常!",ex);
        }
        
    }
    /// <summary>
    ///创建循环检查是否有新消息
    /// </summary>
    private void ReceiveMessages(object parameter)
    {
        while (true)
        {
            // 等待0.5秒,避免过于频繁地检查是否有新消息
            Thread.Sleep(500);
            // 检查是否有新消息
            if (consumer.IsRunning)
            {
                // 处理新消息
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                channel.BasicConsume(queue: parameter.ToString(), autoAck: false, consumer: consumer);
            }
        }
    }
    /// <summary>
    /// 停止接收消息
    /// </summary>
    public void StopReveiceQueue()
    {
        // 停止接收消息并关闭连接
        channel.BasicCancel(consumer.ConsumerTag);
        channel.Close();
        connection.Close();
    }
    #endregion

以下是发送端代码:
#region RabbitMQ发送端
private ConnectionFactory factory;
private IConnection connection;
private IModel channel;
private EventingBasicConsumer consumer;
private IBasicProperties properties;
///
/// 开始创建连接对象
///
public void StartReceiving(string ListenIp, string queueName, string QueueUserName, string QueueUserPassword, int Port = 5672)
{
try
{
//创建RabbitMQ服务器连接对象
factory = new ConnectionFactory() { HostName = ListenIp, Port = 5672, UserName = QueueUserName, Password = QueueUserPassword };
//生成RabbitMQ服务器连接对象
connection = factory.CreateConnection();
//创建RabbitMQ通道对象
channel = connection.CreateModel();
//声明交换机(第一个参数是交换机名称,一个参数是交换机类型,第三个参数是交换机是否持久化)
channel.ExchangeDeclare(“myExchange”, type: “topic”, durable: true);
//声明队列
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//绑定队列和交换机
channel.QueueBind(queue: “myqueue”, exchange: “myExchange”, routingKey: “myRoutingKey”);
//创建消息的持久化、过期时间和优先级对象
properties = channel.CreateBasicProperties();
//设置消息持久化
properties.Persistent = true;
//设置消息过期时间(毫秒)
//properties.Expiration = “10000”;
//设置消息的优先级
//properties.Priority = 1;

            //在这里执行发送数据的逻辑
            var message = "------你要发送的数据------";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "myExchange", routingKey: "myRoutingKey", basicProperties: properties, body: body);
        }
        catch (Exception ex)
        {
            throw new Exception("创建连接异常!", ex);
        }

    }
    /// <summary>
    /// 停止发送消息
    /// </summary>
    public void StopSendQueue()
    {
        //止发送消息并关闭连接
        channel.Close();
        connection.Close();
    }
标签: rabbitmq .net c#

本文转载自: https://blog.csdn.net/weixin_44548405/article/details/131421434
版权归原作者 微笑&星空 所有, 如有侵权,请联系我们删除。

“如何在Window系统中安装RabbitMQ以及在.NET平台上实现收发消息功能”的评论:

还没有评论