虽然RabbitMQ.Client 库有心跳机制,有断线重连机制,但是在网络断掉的时候并不能重连,下面的代码就是解决这个问题,经本人测试有效,适合作为挂机程序
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace HenryMes.RabbitMQ.Consumer
{
class Program
{
/// <summary>
/// RabbitMQ 服务器IP
/// </summary>
private static string ServerIp => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostIp")) ? "127.0.0.1" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostIp");
/// <summary>
/// RabbitMQ 服务器端口, 默认 5672, 网页监控页面是 15672
/// </summary>
private static string ServerPort => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostPort")) ? "5672" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "HostPort");
/// <summary>
/// RabbitMQ 用户名, 默认 guest
/// </summary>
private static string UserName => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "UserName")) ? "guest" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "UserName");
/// <summary>
/// RabbitMQ 密码, 默认 guest
/// </summary>
private static string Password => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Password")) ? "guest" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Password");
public static string ChannelName => string.IsNullOrEmpty(Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Channel")) ? "tags_queue" :
Com.DotNet.Utilities.IO.Config.Select("RabbitMQInfo", "Channel");
/// <summary>
/// Main entry point to the RabbitMQ .NET AMQP client API. Constructs RabbitMQ.Client.IConnection instances.
/// </summary>
private static ConnectionFactory _factory;
private static readonly object Sync = new object();
/// <summary>
/// Main interface to an AMQP connection.
/// </summary>
private static IConnection _conn;
public static IModel _model;
static void Connect()
{
try
{
//连接工厂
_factory = new ConnectionFactory();
//连接工厂信息
_factory.HostName = ServerIp;// "localhost";
int rabbitmq_port = 5672; // 默认是5672端口
int.TryParse(ServerPort, out rabbitmq_port);
_factory.Port = rabbitmq_port;// "5672"
_factory.UserName = UserName;
_factory.Password = Password;
_factory.VirtualHost = "/";
_factory.RequestedHeartbeat = TimeSpan.FromSeconds(2);//心跳包
_factory.AutomaticRecoveryEnabled = true;//自动重连
_factory.TopologyRecoveryEnabled = true;//拓扑重连
_factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
//创建连接
_conn = _factory.CreateConnection();
//断开连接时,调用方法自动重连
_conn.ConnectionShutdown += Connection_ConnectionShutdown;
//创建接收频道
_model = _conn.CreateModel();
// 监控消息
RabbitmqMessageConsume();
Console.WriteLine("尝试连接至RabbitMQ服务器:" + ServerIp);
}
catch (BrokerUnreachableException e)
{
throw e;
}
catch (Exception ex)
{
throw ex;
}
}
private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
Console.WriteLine("RabbitMQ已经断开连接,正在尝试重新连接至RabbitMQ服务器");
Reconnect();
}
private static void Reconnect()
{
try
{
//清除连接及频道
Cleanup();
var mres = new ManualResetEventSlim(false); // state is initially false
while (!mres.Wait(3000)) // loop until state is true, checking every 3s
{
try
{
//连接
Connect();
mres.Set(); // state set to true - breaks out of loop
}
catch (Exception ex)
{
Console.WriteLine("RabbitMQ尝试连接RabbitMQ服务器出现错误:" + ex.Message, ex);
}
}
}
catch (Exception ex)
{
Console.WriteLine("RabbitMQ尝试重新连接RabbitMQ服务器出现错误:" + ex.Message, ex);
}
}
static void Cleanup()
{
try
{
if (_model != null && _model.IsOpen)
{
try
{
_model.Close();
}
catch (Exception ex)
{
Console.WriteLine("RabbitMQ重新连接,正在尝试关闭之前的Channel[接收],但遇到错误", ex);
}
_model = null;
}
if (_conn != null && _conn.IsOpen)
{
try
{
_conn.Close();
}
catch (Exception ex)
{
Console.WriteLine("RabbitMQ重新连接,正在尝试关闭之前的连接,但遇到错误", ex);
}
_conn = null;
}
// 重置OPC连接
if (OpcManager.GetInstance.Instance != null)
{
OpcManager.GetInstance.Instance = null;
}
}
catch (IOException ex)
{
throw ex;
}
}
private static void RabbitmqMessageConsume()
{
try
{
if (_conn == null || !_conn.IsOpen) throw new Exception("连接为空或连接已经关闭");
if (_model == null || !_model.IsOpen) throw new Exception("通道为空或通道已经关闭");
bool queueDurable = true;
//在MQ上定义一个持久化队列,如果名称相同不会重复创建
_model.QueueDeclare(ChannelName, queueDurable, false, false, null);
//输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
_model.BasicQos(0, 1, true);
//创建基于该队列的消费者,绑定事件
var consumer = new EventingBasicConsumer(_model);
//回应消息监控
consumer.Received += SyncData_Received;
//绑定消费者
_model.BasicConsume(ChannelName, //队列名
false, //false:手动应答;true:自动应答
consumer);
Console.WriteLine("开始监控RabbitMQ服务器,队列" + ChannelName);
}
catch (AggregateException ae)
{
//错误信息去重
var errorList = (from error in ae.InnerExceptions select error.Message).Distinct().ToList();
//打印所有错误信息
foreach (string error in errorList)
{
Console.WriteLine(error);
}
}
catch (Exception ex)
{
throw ex;
}
}
private static void SyncData_Received(object sender, BasicDeliverEventArgs e)
{
var watch = new Stopwatch();
watch.Start();
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.Write("尝试读取OPC变量: ");
//var obj = JsonConvert.DeserializeObject<string[]>(message);
//var tags = HenryMes.OpcManager.GetInstance.Instance.Read<string>(obj);
//每页条数
var nodes = JsonConvert.DeserializeObject<string[]>(message).ToList();
const int pageSize = 500;
//页码 0也就是第一条
int pageNum = 0;
var tasks = new List<Task>();
while (pageNum * pageSize < nodes.Count)
{
var pageNodes = nodes.Skip(pageNum * pageSize).Take(pageSize).Select(t => t).ToArray();
tasks.Add(Task.Factory.StartNew(() =>
{
var tags = OpcManager.GetInstance.Instance.Read<string>(pageNodes);
}));
pageNum++;
}
Task.WaitAll(tasks.ToArray());
// 确认收到
_model.BasicAck(e.DeliveryTag, false);
watch.Stop();
TimeSpan timeSpan = watch.Elapsed;
Console.WriteLine("处理耗时{0}ms.", watch.ElapsedMilliseconds);
}
static void Main(string[] args)
{
Reconnect();
Console.ReadKey();
}
}
}
本文转载自: https://blog.csdn.net/lee576/article/details/125003079
版权归原作者 lee576 所有, 如有侵权,请联系我们删除。
版权归原作者 lee576 所有, 如有侵权,请联系我们删除。