C#集成RabbitMQ队列,支持工作队列模式和发布订阅模式
RabbitMQ是一套开源(MPL)的消息队列服务软件,它由生产者和消费者来对消息(数据)进行管理和处理,本文通过C#来集成RabbitMQ,并封装成库dll文件,后期通过调用封装的dll库,来方便使用RabbitMQ的工作队列模式和发布订阅模式。
一、准备工作
安装RabbitMQ 3.11.3版本,可以去官网下载:https://www.rabbitmq.com/也可以直接跳转页面下载:https://download.csdn.net/download/weixin_44239774/87230983
安装过程中要装Erlang语言环境,它会自动提示安装。
安装完后,在浏览器中打开http://localhost:15672/,这是RabbitMQ服务端的一个UI界面,初始账号密码都为guest。
在Admin中,创建Virtual Hosts,命名为/test_virtual,Virtual Hosts就像是数据库里的一个个独立的库,彼此是隔离的,可以在里面创建多个队列。后面样例的队列都会创建在这个Virtual Hosts中。
二、创建类库项目
打开VS,选择class library类库项目,命名为RabbitMQService。
在项目中通过Nuget,安装依赖库RabbitMQ.Client
三、创建结构体RabbitMQ_Factory
在项目中创建工具结构体RabbitMQ_Factory,用来保存连接RabbitMQ服务器的信息,因为后面要经常连接服务器,写数据进队列,所以将配置信息放在结构体里保存,方便调用。
namespaceRabbitMQService{internalstructRabbitMQ_Factory{publicstaticstring HostName {get;set;}publicstaticint Port {get;set;}publicstaticstring UserName {get;set;}publicstaticstring Password {get;set;}publicstaticstring VirtualHost {get;set;}}}
四、创建外部配置文件RabbitMQ_Config.xml
为了能通用各个项目,RabbitMQ服务器的配置信息,应该放在外部配置文件中,做成可配置的。这里是将其配置在xml文件中,新建一个xml文件,命名为RabbitMQ_Config.xml,里面写入配置信息,VirtualHost这一栏写的是上面创建的/test_virtual。
<?xml version="1.0" encoding="utf-8"?><RabbitMQ><Factory><HostName>localhost</HostName><Port>5672</Port><UserName>guest</UserName><Password>guest</Password><VirtualHost>/test_virtual</VirtualHost></Factory></RabbitMQ>
五、创建xml处理类Xml_ConfigAccess
创建xml处理类Xml_ConfigAccess用来在项目中读取上步三中建立的xml信息,默认是读取运行项目目录中的RabbitMQ_Config.xml文件,也有重载方法,读取指定目录下的RabbitMQ_Config.xml文件。
namespaceRabbitMQService{internalclassXml_ConfigAccess{privateXmlDocument _doc;privatestring _filePath;publicXml_ConfigAccess(){if(!File.Exists(Environment.CurrentDirectory +"\\RabbitMQ_Config.xml")){thrownewException($"RabbitMQ_Config.xml doesn't exsit in {Environment.CurrentDirectory +"\\RabbitMQ_Config.xml"}!");}else{
_filePath= Environment.CurrentDirectory +"\\RabbitMQ_Config.xml";
_doc =newXmlDocument();}}publicXml_ConfigAccess(string path){if(!File.Exists(path)){thrownewException($"RabbitMQ_Config.xml doesn't exsit in {path}!");}else{
_filePath = path;
_doc =newXmlDocument();}}publicstringReadFactoryXml(string node){string result ="";//加载要读取的XML
_doc.Load(_filePath);//获得根节点XmlElement AFAC = _doc.DocumentElement;//获得某一类特定的子节点XmlNodeList xnl = AFAC.SelectNodes($"/RabbitMQ/Factory/{node}");foreach(XmlNode item in xnl){
result = item.InnerText;}return result;}}}
六、创建触发事件类RabbitTriggerEvent
创建类RabbitTriggerEvent,用来将消费者收到的数据通过事件触发的形式传出去。
namespaceRabbitMQService{publicclassRabbitTriggerEvent{publicdelegatevoidGetInfoEventHandler(object sender,EventArgs e);publiceventGetInfoEventHandler infoEvent;//存储信息变量publicstring Message ="";//编写引发事件的函数(在程序任意域使用)publicvoidOnMessage(){if(infoEvent !=null){//发送信息infoEvent(this,newEventArgs());}}}}
七、创建主类RabbitMQ
以上几步把需要的工具类都创建完了,下面就是创建dll的主调用类RabbitMQ。
1.创建Init方法
创建Init()方法,用来将RabbitMQ_Config.xml中的信息读取出来放到结构体RabbitMQ_Factory中。
publicstaticvoidInit(){try{Xml_ConfigAccess config =newXml_ConfigAccess();
RabbitMQ_Factory.HostName = config.ReadFactoryXml("HostName");
RabbitMQ_Factory.Port = Convert.ToInt32(config.ReadFactoryXml("Port"));
RabbitMQ_Factory.UserName = config.ReadFactoryXml("UserName");
RabbitMQ_Factory.Password = config.ReadFactoryXml("Password");
RabbitMQ_Factory.VirtualHost = config.ReadFactoryXml("VirtualHost");}catch(Exception){throw;}}
默认情况下配置文件应该在dll调用文件夹同目录下,为了以防不同路径,读取不到配置文件,写个重载Init方法,用来传入配置文件的绝对路径。
publicstaticvoidInit(string path){try{Xml_ConfigAccess config =newXml_ConfigAccess(path);
RabbitMQ_Factory.HostName = config.ReadFactoryXml("HostName");
RabbitMQ_Factory.Port = Convert.ToInt32(config.ReadFactoryXml("Port"));
RabbitMQ_Factory.UserName = config.ReadFactoryXml("UserName");
RabbitMQ_Factory.Password = config.ReadFactoryXml("Password");
RabbitMQ_Factory.VirtualHost = config.ReadFactoryXml("VirtualHost");}catch(Exception){throw;}}
2.创建连接工厂方法GetFactory
创建方法GetFactory(),用来将结构体RabbitMQ_Factory中信息,创建服务器的RabbitMQ连接工厂,输出ConnectionFactory
privatestaticConnectionFactoryGetFactory(){ConnectionFactory factory =newConnectionFactory();
factory.HostName = RabbitMQ_Factory.HostName;
factory.Port = RabbitMQ_Factory.Port;
factory.UserName = RabbitMQ_Factory.UserName;
factory.Password = RabbitMQ_Factory.Password;
factory.VirtualHost = RabbitMQ_Factory.VirtualHost;return factory;}
3.创建Publish_WorkQueues方法
创建Publish_WorkQueues方法,用以在工作队列模式下,作为生产者,发布消息到队列。
/// <summary>/// 工作队列模式下发布消息/// </summary>/// <param name="queueName">写入消息的队列名称</param>/// <param name="message">要发布的消息</param>/// <exception cref="Exception"></exception>publicstaticvoidPublish_WorkQueues(string queueName,string message){ConnectionFactory factory =GetFactory();if(queueName ==""){thrownewException("RabbitMQ procuder find no queue name!");}if(message ==""){thrownewException("RabbitMQ procuder find no message into the queue!");}using(var connection = factory.CreateConnection()){using(var channel = connection.CreateModel()){
channel.QueueDeclare(queueName,true,false,false,null);var body = Encoding.UTF8.GetBytes(message);//消息持久化var props = channel.CreateBasicProperties();
props.Persistent =true;
channel.BasicPublish("", queueName, props, body);}}}
4.创建Create_Cosumer_WorkQueues方法
创建Create_Cosumer_WorkQueues()方法,用以在工作队列模式下,创建一至多个消费者。
/// <summary>/// 工作队列模式下创建消费者/// </summary>/// <param name="queueName">取消息的队列名称</param>/// <param name="triggerEvent">消费者绑定的触发事件</param>publicstaticvoidCreate_Cosumer_WorkQueues(string queueName,RabbitTriggerEvent triggerEvent){if(queueName ==""){thrownewException("RabbitMQ consumer found no queue name!");}if(triggerEvent ==null){thrownewException("RabbitMQ consumer found no binding triggerEvent!");}ConnectionFactory factory =GetFactory();var connection = factory.CreateConnection();var channel = connection.CreateModel();
channel.QueueDeclare(queueName,true,false,false,null);
channel.BasicQos(0,1,false);EventingBasicConsumer consumers =newEventingBasicConsumer(channel);
consumers.Received +=(model, ea)=>{var body = ea.Body.ToArray();
triggerEvent.Message = Encoding.UTF8.GetString(body);//触发事件,由外部方法执行
triggerEvent.OnMessage();
channel.BasicAck(ea.DeliveryTag,false);};
channel.BasicConsume(queueName,false, consumers);}
5.创建Publish_PubSub方法
创建Publish_PubSub()方法,用以在发布订阅模式下,作为生产者,发送消息到服务器,需要输入处理消息交换机的名字,同时输入消息带的routingKey。
/// <summary>/// 订阅模式下发布消息/// </summary>/// <param name="exchangeName">写到交换机的名字</param>/// <param name="routingKey">写到哪个队列的路由key,不写为广播模式</param>/// <param name="message">要发布的消息</param>publicstaticvoidPublish_PubSub(string exchangeName,string routingKey,string message){if(exchangeName ==""){thrownewException("RabbitMQ procuder found no exchangeName name!");}if(message ==""){thrownewException("RabbitMQ procuder found no message!");}ConnectionFactory factory =GetFactory();using(var connection = factory.CreateConnection()){using(var channel = connection.CreateModel()){var body = Encoding.UTF8.GetBytes(message);//消息持久化var props = channel.CreateBasicProperties();
props.Persistent =true;;//将消息和routingkey发布到指定交换机,由交换机根据队列绑定的routingKey来分配消息
channel.BasicPublish(exchangeName, routingKey, props, body);}}}
通过路由key(绑定队列),交换机决定将消息发送到那个队列,消费者再通过绑定队列来获取自己需要的消息。
6.创建Create_Consumer_PubSub方法
创建Create_Consumer_PubSub()方法,在发布订阅模式下,创建一至多个消费者。
/// <summary>/// 订阅模式下创建消费者/// </summary>/// <param name="exchangeName">连接交换机的名字</param>/// <param name="routingKeys">消费者拥有的routingKey</param>/// <param name="queueName">消费者绑定队列的名字</param>/// <param name="triggerEvent">消费者绑定的触发事件</param>/// <exception cref="Exception"></exception>publicstaticvoidCreate_Consumer_PubSub(string exchangeName,List<string> routingKeys,string queueName,RabbitTriggerEvent triggerEvent){if(queueName ==""){thrownewException("RabbitMQ consumer found no queue name!");}if(triggerEvent ==null){thrownewException("RabbitMQ consumer found no binding triggerEvent!");}if(exchangeName ==""){thrownewException("RabbitMQ consumer found no exchangeName name!");}ConnectionFactory factory =GetFactory();var connection = factory.CreateConnection();var channel = connection.CreateModel();//当routingKey没有时,连接广播式的交换机,当有时,连接对点对的交换机if(routingKeys.Count ==0){
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
channel.QueueDeclare(queueName,true,false,false,null);
channel.QueueBind(queueName, exchangeName,"");}else{
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
channel.QueueDeclare(queueName,true,false,false,null);//给消费者队列绑定多个routingKeyforeach(var routingKey in routingKeys){
channel.QueueBind(queueName, exchangeName, routingKey);}}
channel.BasicQos(0,1,false);EventingBasicConsumer consumers =newEventingBasicConsumer(channel);
consumers.Received +=(model, ea)=>{var body = ea.Body.ToArray();
triggerEvent.Message = Encoding.UTF8.GetString(body);
triggerEvent.OnMessage();
channel.BasicAck(ea.DeliveryTag,false);};
channel.BasicConsume(queueName,false, consumers);}
调用一次此方法就创建一个消费者,消费者绑定传入queueName的队列,同时该队列可以绑定多个routingKey,当传进的routingKey的个数为0时,则将消息发送到fanout类型的交换机,交换机会以广播的形式将消息发送到每个队列。当routingKey的个数不为0时,则将消息发送到Direct类型的交换机,交换机会以routingKey,将消息分别投递到不同的队列中去。
八、编译项目,获得dll类库
通过以上几步,dll项目就创建完成,获得项目结构如下:
编译项目,获得dll文件。
九、创建winform项目,集成RabbitMQService.dll
创建winform项目,画出如下界面。左边为工作队列模式,右边为发布订阅模式。
首先引用RabbitMQService.dll类库。
在窗体初始化中,调用RabbitMQ类中Init()方法
publicForm1(){InitializeComponent();
RabbitMQ.Init();}
先调试工作队列模式,在publish1按钮下,写如下调用逻辑,创建一个Demo1的队列,在里面写入textbox1的内容。
privatevoidpublish1_Click(object sender,EventArgs e){try{
RabbitMQ.Publish_WorkQueues("Demo1", textBox1.Text);}catch(Exception ex){
MessageBox.Show(ex.Message);}}
创建3个消费者,consumer1~consumer3按钮的逻辑都相同,每个消费者都绑定一个单独的消息处理事件和触发方法,写如下代码:
//开启工作队列模式下第一个消费者privatevoidcnsumer1_button_Click(object sender,EventArgs e){try{RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
infoEvent.infoEvent += GetCosumer1Info;
RabbitMQ.Create_Cosumer_WorkQueues("Demo1", infoEvent);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidGetCosumer1Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumer1_textBox.Text += a.Message +"\r\n";};this.consumer1_textBox.Invoke(actionDelegate);
Thread.Sleep(1000);}catch(Exception ex){
MessageBox.Show(ex.Message);}}//开启工作队列模式下第二个消费者privatevoidcosumer2_button_Click(object sender,EventArgs e){try{RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
infoEvent.infoEvent += GetCosumer2Info;
RabbitMQ.Create_Cosumer_WorkQueues("Demo1", infoEvent);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidGetCosumer2Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumer2_textBox.Text += a.Message +"\r\n";};this.consumer2_textBox.Invoke(actionDelegate);
Thread.Sleep(2000);}catch(Exception ex){
MessageBox.Show(ex.Message);}}//开启工作队列模式下第三个消费者privatevoidconsumer3_button_Click(object sender,EventArgs e){try{RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
infoEvent.infoEvent += GetCosumer3Info;
RabbitMQ.Create_Cosumer_WorkQueues("Demo1", infoEvent);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidGetCosumer3Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumer3_textBox.Text += a.Message +"\r\n";};this.consumer3_textBox.Invoke(actionDelegate);
Thread.Sleep(3000);}catch(Exception ex){
MessageBox.Show(ex.Message);}}
启动winform,发送消息给队列,开启cousmer1~3,发现3个消费者能正常接收处理消息,且能按照自己的消费能力强弱去接收消息的多少。
再调试发布订阅模式,在publishSub1按钮下,写如下逻辑,需要输入交换机名称,路由key和发布的内容:
privatevoidpublishSub_button_Click(object sender,EventArgs e){try{
RabbitMQ.Publish_PubSub(exchange_textBox.Text,routingKey_textBox.Text,textBox2.Text);}catch(Exception ex){
MessageBox.Show(ex.Message);}}
在consumerSub1~consumerSub3按钮中,写如下逻辑,逻辑和工作队列模式消费者基本相同,要绑定单独的消息处理事件,还得多绑定一个队列的routingKey数组,用来route消息到数组:
privatevoidconsumerSub1_button_Click(object sender,EventArgs e){try{List<string> list =newList<string>();
list.Add("key1");RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
infoEvent.infoEvent += GetCosumerSub1Info;
RabbitMQ.Create_Consumer_PubSub(exchange_textBox.Text,list,"TestQueue1",infoEvent);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidGetCosumerSub1Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumerSub1_textBox.Text += a.Message +"\r\n";};this.consumerSub1_textBox.Invoke(actionDelegate);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidconsumerSub2_button_Click(object sender,EventArgs e){try{List<string> list =newList<string>();
list.Add("key2");RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
infoEvent.infoEvent += GetCosumerSub2Info;
RabbitMQ.Create_Consumer_PubSub(exchange_textBox.Text, list,"TestQueue2", infoEvent);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidGetCosumerSub2Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumerSub2_textBox.Text += a.Message +"\r\n";};this.consumerSub2_textBox.Invoke(actionDelegate);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidconssumerSub3_button_Click(object sender,EventArgs e){try{List<string> list =newList<string>();
list.Add("key3");
list.Add("key4");RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
infoEvent.infoEvent += GetCosumerSub3Info;
RabbitMQ.Create_Consumer_PubSub(exchange_textBox.Text, list,"TestQueue3", infoEvent);}catch(Exception ex){
MessageBox.Show(ex.Message);}}privatevoidGetCosumerSub3Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumerSub3_textBox.Text += a.Message +"\r\n";};this.consumerSub3_textBox.Invoke(actionDelegate);}catch(Exception ex){
MessageBox.Show(ex.Message);}}
打开winform,输入exchange名字,routingKey和消息内容,注意代码中的逻辑,routingKey中key1,是将消息路由到consumerSub1,key2,是将消息路由到consumerSub2,key3、key4是将消息路由到consumerSub3
十、小结
以上就是编写RabbitMQService.dll的全过程,包括引用和使用的方法。引用该库后,基本只要使用RabbitMQ类中的几个方法就能快速的创建和使用生产者和消费者,消费者也能有单独的事件来处理各自的消息。
个人也在很多项目上使用了这个类库,基本没啥问题,RabbitMQ当然不止这两种模式,但是在工业领域中,这两种模式基本够用。需要的可以通过链接来下载此类库:https://download.csdn.net/download/weixin_44239774/87222016
书写不易,引用请标明出处,谢谢~
版权归原作者 Fan Felix 所有, 如有侵权,请联系我们删除。