0


C#使用RabbitMQ发送和接收消息工具类

下面是一个简单的 C# RabbitMQ 发送和接收消息的封装工具类的示例代码:

工具类

通过NuGet安装

RabbitMQ.Client
usingNewtonsoft.Json;usingRabbitMQ.Client;usingRabbitMQ.Client.Events;usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Text;usingSystem.Threading.Channels;usingSystem.Threading.Tasks;namespaceWorkerService1{publicclassRabbitMQHelper:IDisposable{privatereadonlyConnectionFactory _factory;privateIConnection _connection;privateIModel _channel;publicRabbitMQHelper(){// 设置连接参数
            _factory =newConnectionFactory(){ HostName ="localhost", Port =5672, UserName ="guest", Password ="guest"};}/// <summary>/// 发送消息/// </summary>/// <typeparam name="T"></typeparam>/// <param name="queueName"></param>/// <param name="message"></param>publicvoidSendMessage<T>(string queueName,T message){try{InitConnection();// 声明队列
                _channel.QueueDeclare(queue: queueName,durable:true,// 设置为true表示队列是持久化的
                    exclusive:false,autoDelete:false,arguments:null);var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                _channel.BasicPublish(exchange:"",routingKey: queueName,basicProperties:null,body: body);}catch(Exception ex){
                Console.WriteLine("Failed to send message: {0}", ex.Message);}}/// <summary>/// 接收消息/// </summary>/// <typeparam name="T"></typeparam>/// <param name="queueName"></param>/// <param name="messageHandler"></param>publicvoidReceiveMessage<T>(string queueName,Action<T> messageHandler){try{InitConnection();// 声明队列(接收需声明队列,否则队列不存在时,无法接收消息)
                _channel.QueueDeclare(queue: queueName,durable:true,// 设置为true表示队列是持久化的
                    exclusive:false,autoDelete:false,arguments:null);//设置消费者数量(并发度),每个消费者每次只能处理一条消息
                _channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);// 创建消费者var consumer =newEventingBasicConsumer(_channel);
                consumer.Received +=(model, ea)=>{try{var message = Encoding.UTF8.GetString(ea.Body.ToArray());var convertedMessage = JsonConvert.DeserializeObject<T>(message);//委托方法
                        messageHandler.Invoke(convertedMessage);// 消息处理成功,确认消息
                        _channel.BasicAck(ea.DeliveryTag,false);}catch(Exception ex){// 消息处理异常,确认消息
                        _channel.BasicAck(ea.DeliveryTag,false);}};

                _channel.BasicConsume(queue: queueName,autoAck:false,// 设置为true表示自动确认消息
                    consumer: consumer);}catch(Exception ex){
                Console.WriteLine("Failed to receive message: {0}", ex.Message);}}/// <summary>/// 初始化链接/// </summary>privatevoidInitConnection(){if(_connection ==null||!_connection.IsOpen){
                _connection = _factory.CreateConnection();
                _channel = _connection.CreateModel();}}/// <summary>/// 释放资源/// </summary>publicvoidDispose(){
            _channel?.Close();
            _channel?.Dispose();
            _connection?.Close();
            _connection?.Dispose();}}}

使用示例

usingSystem;usingSystem.Text;usingSystem.Threading.Tasks;usingWorkerService1;publicclassProgram{privatestaticstring QueueName ="myqueue_key";publicstaticvoidMain(){var rabbitMQHelper =newRabbitMQHelper();for(long i =0; i <30; i++){
            rabbitMQHelper.SendMessage(QueueName, i);}

        rabbitMQHelper.ReceiveMessage<long>(QueueName, ReceivedHandle);

        Console.ReadLine();}/// <summary>/// 接收处理/// </summary>/// <param name="index"></param>privatestaticvoidReceivedHandle(long index){try{
            Console.WriteLine($"第{index}次开始{DateTime.Now}");
            Thread.Sleep(2000);
            Console.WriteLine($"第{index}次结束{DateTime.Now}");}catch(Exception ex){
            Console.WriteLine(ex.Message);}}}
标签: c# rabbitmq

本文转载自: https://blog.csdn.net/qq_21275565/article/details/131459622
版权归原作者 让梦想疯狂 所有, 如有侵权,请联系我们删除。

“C#使用RabbitMQ发送和接收消息工具类”的评论:

还没有评论