0


C#实现集成RabbitMQ队列,支持工作队列模式和发布订阅模式

C#集成RabbitMQ队列,支持工作队列模式和发布订阅模式

RabbitMQ是一套开源(MPL)的消息队列服务软件,它由生产者和消费者来对消息(数据)进行管理和处理,本文通过C#来集成RabbitMQ,并封装成库dll文件,后期通过调用封装的dll库,来方便使用RabbitMQ的工作队列模式和发布订阅模式。

一、准备工作

安装RabbitMQ 3.11.3版本,可以去官网下载:https://www.rabbitmq.com/也可以直接跳转页面下载:https://download.csdn.net/download/weixin_44239774/87230983
安装过程中要装Erlang语言环境,它会自动提示安装。
安装完后,在浏览器中打开http://localhost:15672/,这是RabbitMQ服务端的一个UI界面,初始账号密码都为guest。
登录界面
在Admin中,创建Virtual Hosts,命名为/test_virtual,Virtual Hosts就像是数据库里的一个个独立的库,彼此是隔离的,可以在里面创建多个队列。后面样例的队列都会创建在这个Virtual Hosts中。

virtual host

二、创建类库项目

打开VS,选择class library类库项目,命名为RabbitMQService。
创建项目
在项目中通过Nuget,安装依赖库RabbitMQ.Client
在这里插入图片描述

三、创建结构体RabbitMQ_Factory

在项目中创建工具结构体RabbitMQ_Factory,用来保存连接RabbitMQ服务器的信息,因为后面要经常连接服务器,写数据进队列,所以将配置信息放在结构体里保存,方便调用。

namespaceRabbitMQService{internalstructRabbitMQ_Factory{publicstaticstring HostName {get;set;}publicstaticint Port {get;set;}publicstaticstring UserName {get;set;}publicstaticstring Password {get;set;}publicstaticstring VirtualHost {get;set;}}}

四、创建外部配置文件RabbitMQ_Config.xml

为了能通用各个项目,RabbitMQ服务器的配置信息,应该放在外部配置文件中,做成可配置的。这里是将其配置在xml文件中,新建一个xml文件,命名为RabbitMQ_Config.xml,里面写入配置信息,VirtualHost这一栏写的是上面创建的/test_virtual。

<?xml version="1.0" encoding="utf-8"?><RabbitMQ><Factory><HostName>localhost</HostName><Port>5672</Port><UserName>guest</UserName><Password>guest</Password><VirtualHost>/test_virtual</VirtualHost></Factory></RabbitMQ>

五、创建xml处理类Xml_ConfigAccess

创建xml处理类Xml_ConfigAccess用来在项目中读取上步三中建立的xml信息,默认是读取运行项目目录中的RabbitMQ_Config.xml文件,也有重载方法,读取指定目录下的RabbitMQ_Config.xml文件。

namespaceRabbitMQService{internalclassXml_ConfigAccess{privateXmlDocument _doc;privatestring _filePath;publicXml_ConfigAccess(){if(!File.Exists(Environment.CurrentDirectory +"\\RabbitMQ_Config.xml")){thrownewException($"RabbitMQ_Config.xml doesn't exsit in {Environment.CurrentDirectory +"\\RabbitMQ_Config.xml"}!");}else{
                _filePath= Environment.CurrentDirectory +"\\RabbitMQ_Config.xml";
                _doc =newXmlDocument();}}publicXml_ConfigAccess(string path){if(!File.Exists(path)){thrownewException($"RabbitMQ_Config.xml doesn't exsit in {path}!");}else{
                _filePath = path;
                _doc =newXmlDocument();}}publicstringReadFactoryXml(string node){string result ="";//加载要读取的XML
            _doc.Load(_filePath);//获得根节点XmlElement AFAC = _doc.DocumentElement;//获得某一类特定的子节点XmlNodeList xnl = AFAC.SelectNodes($"/RabbitMQ/Factory/{node}");foreach(XmlNode item in xnl){
                result = item.InnerText;}return result;}}}

六、创建触发事件类RabbitTriggerEvent

创建类RabbitTriggerEvent,用来将消费者收到的数据通过事件触发的形式传出去。

namespaceRabbitMQService{publicclassRabbitTriggerEvent{publicdelegatevoidGetInfoEventHandler(object sender,EventArgs e);publiceventGetInfoEventHandler infoEvent;//存储信息变量publicstring Message ="";//编写引发事件的函数(在程序任意域使用)publicvoidOnMessage(){if(infoEvent !=null){//发送信息infoEvent(this,newEventArgs());}}}}

七、创建主类RabbitMQ

以上几步把需要的工具类都创建完了,下面就是创建dll的主调用类RabbitMQ。

1.创建Init方法

创建Init()方法,用来将RabbitMQ_Config.xml中的信息读取出来放到结构体RabbitMQ_Factory中。

publicstaticvoidInit(){try{Xml_ConfigAccess config =newXml_ConfigAccess();
                RabbitMQ_Factory.HostName = config.ReadFactoryXml("HostName");
                RabbitMQ_Factory.Port = Convert.ToInt32(config.ReadFactoryXml("Port"));
                RabbitMQ_Factory.UserName = config.ReadFactoryXml("UserName");
                RabbitMQ_Factory.Password = config.ReadFactoryXml("Password");
                RabbitMQ_Factory.VirtualHost = config.ReadFactoryXml("VirtualHost");}catch(Exception){throw;}}

默认情况下配置文件应该在dll调用文件夹同目录下,为了以防不同路径,读取不到配置文件,写个重载Init方法,用来传入配置文件的绝对路径。

publicstaticvoidInit(string path){try{Xml_ConfigAccess config =newXml_ConfigAccess(path);
                RabbitMQ_Factory.HostName = config.ReadFactoryXml("HostName");
                RabbitMQ_Factory.Port = Convert.ToInt32(config.ReadFactoryXml("Port"));
                RabbitMQ_Factory.UserName = config.ReadFactoryXml("UserName");
                RabbitMQ_Factory.Password = config.ReadFactoryXml("Password");
                RabbitMQ_Factory.VirtualHost = config.ReadFactoryXml("VirtualHost");}catch(Exception){throw;}}

2.创建连接工厂方法GetFactory

创建方法GetFactory(),用来将结构体RabbitMQ_Factory中信息,创建服务器的RabbitMQ连接工厂,输出ConnectionFactory

privatestaticConnectionFactoryGetFactory(){ConnectionFactory factory =newConnectionFactory();
            factory.HostName = RabbitMQ_Factory.HostName;
            factory.Port = RabbitMQ_Factory.Port;
            factory.UserName = RabbitMQ_Factory.UserName;
            factory.Password = RabbitMQ_Factory.Password;
            factory.VirtualHost = RabbitMQ_Factory.VirtualHost;return factory;}

3.创建Publish_WorkQueues方法

创建Publish_WorkQueues方法,用以在工作队列模式下,作为生产者,发布消息到队列。
工作队列模式

/// <summary>/// 工作队列模式下发布消息/// </summary>/// <param name="queueName">写入消息的队列名称</param>/// <param name="message">要发布的消息</param>/// <exception cref="Exception"></exception>publicstaticvoidPublish_WorkQueues(string queueName,string message){ConnectionFactory factory =GetFactory();if(queueName ==""){thrownewException("RabbitMQ procuder find no queue name!");}if(message ==""){thrownewException("RabbitMQ procuder find no message into the queue!");}using(var connection = factory.CreateConnection()){using(var channel = connection.CreateModel()){
                    channel.QueueDeclare(queueName,true,false,false,null);var body = Encoding.UTF8.GetBytes(message);//消息持久化var props = channel.CreateBasicProperties();
                    props.Persistent =true;
                    channel.BasicPublish("", queueName, props, body);}}}

4.创建Create_Cosumer_WorkQueues方法

创建Create_Cosumer_WorkQueues()方法,用以在工作队列模式下,创建一至多个消费者。

/// <summary>/// 工作队列模式下创建消费者/// </summary>/// <param name="queueName">取消息的队列名称</param>/// <param name="triggerEvent">消费者绑定的触发事件</param>publicstaticvoidCreate_Cosumer_WorkQueues(string queueName,RabbitTriggerEvent triggerEvent){if(queueName ==""){thrownewException("RabbitMQ consumer found no queue name!");}if(triggerEvent ==null){thrownewException("RabbitMQ consumer found no binding triggerEvent!");}ConnectionFactory factory =GetFactory();var connection = factory.CreateConnection();var channel = connection.CreateModel();
            channel.QueueDeclare(queueName,true,false,false,null);
            channel.BasicQos(0,1,false);EventingBasicConsumer consumers =newEventingBasicConsumer(channel);
            consumers.Received +=(model, ea)=>{var body = ea.Body.ToArray();
                triggerEvent.Message = Encoding.UTF8.GetString(body);//触发事件,由外部方法执行
                triggerEvent.OnMessage();
                channel.BasicAck(ea.DeliveryTag,false);};
            channel.BasicConsume(queueName,false, consumers);}

5.创建Publish_PubSub方法

创建Publish_PubSub()方法,用以在发布订阅模式下,作为生产者,发送消息到服务器,需要输入处理消息交换机的名字,同时输入消息带的routingKey。
routing模式

/// <summary>/// 订阅模式下发布消息/// </summary>/// <param name="exchangeName">写到交换机的名字</param>/// <param name="routingKey">写到哪个队列的路由key,不写为广播模式</param>/// <param name="message">要发布的消息</param>publicstaticvoidPublish_PubSub(string exchangeName,string routingKey,string message){if(exchangeName ==""){thrownewException("RabbitMQ procuder found no exchangeName name!");}if(message ==""){thrownewException("RabbitMQ procuder found no message!");}ConnectionFactory factory =GetFactory();using(var connection = factory.CreateConnection()){using(var channel = connection.CreateModel()){var body = Encoding.UTF8.GetBytes(message);//消息持久化var props = channel.CreateBasicProperties();
                    props.Persistent =true;;//将消息和routingkey发布到指定交换机,由交换机根据队列绑定的routingKey来分配消息
                    channel.BasicPublish(exchangeName, routingKey, props, body);}}}

通过路由key(绑定队列),交换机决定将消息发送到那个队列,消费者再通过绑定队列来获取自己需要的消息。

6.创建Create_Consumer_PubSub方法

创建Create_Consumer_PubSub()方法,在发布订阅模式下,创建一至多个消费者。

/// <summary>/// 订阅模式下创建消费者/// </summary>/// <param name="exchangeName">连接交换机的名字</param>/// <param name="routingKeys">消费者拥有的routingKey</param>/// <param name="queueName">消费者绑定队列的名字</param>/// <param name="triggerEvent">消费者绑定的触发事件</param>/// <exception cref="Exception"></exception>publicstaticvoidCreate_Consumer_PubSub(string exchangeName,List<string> routingKeys,string queueName,RabbitTriggerEvent triggerEvent){if(queueName ==""){thrownewException("RabbitMQ consumer found no queue name!");}if(triggerEvent ==null){thrownewException("RabbitMQ consumer found no binding triggerEvent!");}if(exchangeName ==""){thrownewException("RabbitMQ consumer  found no exchangeName name!");}ConnectionFactory factory =GetFactory();var connection = factory.CreateConnection();var channel = connection.CreateModel();//当routingKey没有时,连接广播式的交换机,当有时,连接对点对的交换机if(routingKeys.Count ==0){
                channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                channel.QueueDeclare(queueName,true,false,false,null);

                channel.QueueBind(queueName, exchangeName,"");}else{
                channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                channel.QueueDeclare(queueName,true,false,false,null);//给消费者队列绑定多个routingKeyforeach(var routingKey in routingKeys){
                    channel.QueueBind(queueName, exchangeName, routingKey);}}

            channel.BasicQos(0,1,false);EventingBasicConsumer consumers =newEventingBasicConsumer(channel);

            consumers.Received +=(model, ea)=>{var body = ea.Body.ToArray();
                triggerEvent.Message = Encoding.UTF8.GetString(body);
                triggerEvent.OnMessage();
                channel.BasicAck(ea.DeliveryTag,false);};

            channel.BasicConsume(queueName,false, consumers);}

调用一次此方法就创建一个消费者,消费者绑定传入queueName的队列,同时该队列可以绑定多个routingKey,当传进的routingKey的个数为0时,则将消息发送到fanout类型的交换机,交换机会以广播的形式将消息发送到每个队列。当routingKey的个数不为0时,则将消息发送到Direct类型的交换机,交换机会以routingKey,将消息分别投递到不同的队列中去。

八、编译项目,获得dll类库

通过以上几步,dll项目就创建完成,获得项目结构如下:
项目结构
编译项目,获得dll文件。
dll文件

九、创建winform项目,集成RabbitMQService.dll

创建winform项目,画出如下界面。左边为工作队列模式,右边为发布订阅模式。
界面
首先引用RabbitMQService.dll类库。
引用
在窗体初始化中,调用RabbitMQ类中Init()方法

publicForm1(){InitializeComponent();
            RabbitMQ.Init();}

先调试工作队列模式,在publish1按钮下,写如下调用逻辑,创建一个Demo1的队列,在里面写入textbox1的内容。

privatevoidpublish1_Click(object sender,EventArgs e){try{
                RabbitMQ.Publish_WorkQueues("Demo1", textBox1.Text);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}

创建3个消费者,consumer1~consumer3按钮的逻辑都相同,每个消费者都绑定一个单独的消息处理事件和触发方法,写如下代码:

//开启工作队列模式下第一个消费者privatevoidcnsumer1_button_Click(object sender,EventArgs e){try{RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
                infoEvent.infoEvent += GetCosumer1Info;
                RabbitMQ.Create_Cosumer_WorkQueues("Demo1", infoEvent);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidGetCosumer1Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumer1_textBox.Text += a.Message +"\r\n";};this.consumer1_textBox.Invoke(actionDelegate);
                Thread.Sleep(1000);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}//开启工作队列模式下第二个消费者privatevoidcosumer2_button_Click(object sender,EventArgs e){try{RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
                infoEvent.infoEvent += GetCosumer2Info;
                RabbitMQ.Create_Cosumer_WorkQueues("Demo1", infoEvent);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidGetCosumer2Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumer2_textBox.Text += a.Message +"\r\n";};this.consumer2_textBox.Invoke(actionDelegate);
                Thread.Sleep(2000);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}//开启工作队列模式下第三个消费者privatevoidconsumer3_button_Click(object sender,EventArgs e){try{RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
                infoEvent.infoEvent += GetCosumer3Info;
                RabbitMQ.Create_Cosumer_WorkQueues("Demo1", infoEvent);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidGetCosumer3Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumer3_textBox.Text += a.Message +"\r\n";};this.consumer3_textBox.Invoke(actionDelegate);
                Thread.Sleep(3000);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}

启动winform,发送消息给队列,开启cousmer1~3,发现3个消费者能正常接收处理消息,且能按照自己的消费能力强弱去接收消息的多少。
在这里插入图片描述
再调试发布订阅模式,在publishSub1按钮下,写如下逻辑,需要输入交换机名称,路由key和发布的内容:

privatevoidpublishSub_button_Click(object sender,EventArgs e){try{
                RabbitMQ.Publish_PubSub(exchange_textBox.Text,routingKey_textBox.Text,textBox2.Text);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}

在consumerSub1~consumerSub3按钮中,写如下逻辑,逻辑和工作队列模式消费者基本相同,要绑定单独的消息处理事件,还得多绑定一个队列的routingKey数组,用来route消息到数组:

privatevoidconsumerSub1_button_Click(object sender,EventArgs e){try{List<string> list =newList<string>();
                list.Add("key1");RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
                infoEvent.infoEvent += GetCosumerSub1Info;
                RabbitMQ.Create_Consumer_PubSub(exchange_textBox.Text,list,"TestQueue1",infoEvent);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidGetCosumerSub1Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumerSub1_textBox.Text += a.Message +"\r\n";};this.consumerSub1_textBox.Invoke(actionDelegate);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidconsumerSub2_button_Click(object sender,EventArgs e){try{List<string> list =newList<string>();
                list.Add("key2");RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
                infoEvent.infoEvent += GetCosumerSub2Info;
                RabbitMQ.Create_Consumer_PubSub(exchange_textBox.Text, list,"TestQueue2", infoEvent);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidGetCosumerSub2Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumerSub2_textBox.Text += a.Message +"\r\n";};this.consumerSub2_textBox.Invoke(actionDelegate);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidconssumerSub3_button_Click(object sender,EventArgs e){try{List<string> list =newList<string>();
                list.Add("key3");
                list.Add("key4");RabbitTriggerEvent infoEvent =newRabbitTriggerEvent();
                infoEvent.infoEvent += GetCosumerSub3Info;
                RabbitMQ.Create_Consumer_PubSub(exchange_textBox.Text, list,"TestQueue3", infoEvent);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}privatevoidGetCosumerSub3Info(object sender,EventArgs e){try{RabbitTriggerEvent a =(RabbitTriggerEvent)sender;Action actionDelegate =()=>{this.consumerSub3_textBox.Text += a.Message +"\r\n";};this.consumerSub3_textBox.Invoke(actionDelegate);}catch(Exception ex){
                MessageBox.Show(ex.Message);}}

打开winform,输入exchange名字,routingKey和消息内容,注意代码中的逻辑,routingKey中key1,是将消息路由到consumerSub1,key2,是将消息路由到consumerSub2,key3、key4是将消息路由到consumerSub3
winform

十、小结

以上就是编写RabbitMQService.dll的全过程,包括引用和使用的方法。引用该库后,基本只要使用RabbitMQ类中的几个方法就能快速的创建和使用生产者和消费者,消费者也能有单独的事件来处理各自的消息。
个人也在很多项目上使用了这个类库,基本没啥问题,RabbitMQ当然不止这两种模式,但是在工业领域中,这两种模式基本够用。需要的可以通过链接来下载此类库:https://download.csdn.net/download/weixin_44239774/87222016

书写不易,引用请标明出处,谢谢~

标签: rabbitmq c# 分布式

本文转载自: https://blog.csdn.net/weixin_44239774/article/details/128123385
版权归原作者 Fan Felix 所有, 如有侵权,请联系我们删除。

“C#实现集成RabbitMQ队列,支持工作队列模式和发布订阅模式”的评论:

还没有评论