以下是接收客户端代码:
#region RabbitMQ接收客户端
private ConnectionFactory factory;
private IConnection connection;
private IModel channel;
private EventingBasicConsumer consumer;
///
/// 开始创建连接对象
///
public void StartReceiving(string ListenIp,string queueName, string QueueUserName,string QueueUserPassword, int Port = 5672)
{
try
{
//创建RabbitMQ服务器连接对象
factory = new ConnectionFactory() { HostName = ListenIp, Port = 5672, UserName = QueueUserName, Password = QueueUserPassword };
//生成RabbitMQ服务器连接对象
connection = factory.CreateConnection();
//创建RabbitMQ通道对象
channel = connection.CreateModel();
//声明交换机
channel.ExchangeDeclare(“myExchange”, type: “topic”,durable:true);
//声明队列
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//绑定队列和交换机
channel.QueueBind(queue: “myqueue”, exchange: “myExchange”, routingKey: “myRoutingKey”);
//创建接收队列对象
consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//数据处理
bool re = ProcessMessage(message);
if (re)
{
// 处理完消息后,手动确认消息已被消费
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
};
//订阅消息队列
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
// 创建一个新线程来保持接收消息的循环
Thread receiveThread = new Thread(ReceiveMessages);
receiveThread.Start(queueName);
}
catch (Exception ex)
{
throw new Exception("创建连接异常!",ex);
}
}
/// <summary>
///创建循环检查是否有新消息
/// </summary>
private void ReceiveMessages(object parameter)
{
while (true)
{
// 等待0.5秒,避免过于频繁地检查是否有新消息
Thread.Sleep(500);
// 检查是否有新消息
if (consumer.IsRunning)
{
// 处理新消息
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.BasicConsume(queue: parameter.ToString(), autoAck: false, consumer: consumer);
}
}
}
/// <summary>
/// 停止接收消息
/// </summary>
public void StopReveiceQueue()
{
// 停止接收消息并关闭连接
channel.BasicCancel(consumer.ConsumerTag);
channel.Close();
connection.Close();
}
#endregion
以下是发送端代码:
#region RabbitMQ发送端
private ConnectionFactory factory;
private IConnection connection;
private IModel channel;
private EventingBasicConsumer consumer;
private IBasicProperties properties;
///
/// 开始创建连接对象
///
public void StartReceiving(string ListenIp, string queueName, string QueueUserName, string QueueUserPassword, int Port = 5672)
{
try
{
//创建RabbitMQ服务器连接对象
factory = new ConnectionFactory() { HostName = ListenIp, Port = 5672, UserName = QueueUserName, Password = QueueUserPassword };
//生成RabbitMQ服务器连接对象
connection = factory.CreateConnection();
//创建RabbitMQ通道对象
channel = connection.CreateModel();
//声明交换机(第一个参数是交换机名称,一个参数是交换机类型,第三个参数是交换机是否持久化)
channel.ExchangeDeclare(“myExchange”, type: “topic”, durable: true);
//声明队列
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
//绑定队列和交换机
channel.QueueBind(queue: “myqueue”, exchange: “myExchange”, routingKey: “myRoutingKey”);
//创建消息的持久化、过期时间和优先级对象
properties = channel.CreateBasicProperties();
//设置消息持久化
properties.Persistent = true;
//设置消息过期时间(毫秒)
//properties.Expiration = “10000”;
//设置消息的优先级
//properties.Priority = 1;
//在这里执行发送数据的逻辑
var message = "------你要发送的数据------";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "myExchange", routingKey: "myRoutingKey", basicProperties: properties, body: body);
}
catch (Exception ex)
{
throw new Exception("创建连接异常!", ex);
}
}
/// <summary>
/// 停止发送消息
/// </summary>
public void StopSendQueue()
{
//止发送消息并关闭连接
channel.Close();
connection.Close();
}
版权归原作者 微笑&星空 所有, 如有侵权,请联系我们删除。