0


消息队列10:为RabbitMq添加连接池

环境:

  • win11
  • rabbitmq-3.8.17
  • .net 6.0
  • RabbitMQ.Client 6.8.1
  • vs2022

安装RabbitMq环境参照:

  • window下安装rabbitmq
  • linux下安装rabbitmq

问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。

简易实现如下:

usingRabbitMQ.Client;usingSystem.Collections.Concurrent;usingSystem.Text;//测试调用var channel =await ChannelPool.Default.GetChannelAsync("guid",()=>{var factory =newConnectionFactory(){
        HostName ="localhost",
        Port =5672,
        UserName ="test",
        Password ="123456",
        VirtualHost ="/",};return Task.FromResult(factory.CreateConnection());});try{var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");
    channel.RawChannel.BasicPublish(exchange:"",routingKey:"test-queue",body: body);}finally{
    channel.Return();}#region 连接池/// <summary>/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>/// </summary>publicclassChannelPool{publicstaticChannelPool Default =new(8,50);privateint connectionCount;privateint channelCountPerConnection;publicChannelPool(int connectionCount =1,int channelCountPerConnection =5){if(connectionCount >0)this.connectionCount = connectionCount;if(channelCountPerConnection >0)this.channelCountPerConnection = channelCountPerConnection;}publicclassChannelItem{publicint ConnectionIndex {get;set;}publicHostItem CacheHost {get;set;}publicIModel RawChannel {get;set;}publicvoidReturn()=> CacheHost.ChannelPools[ConnectionIndex].Return(this);}publicclassHostItem{publicSemaphoreSlim HostLocker {get;set;}publicList<IConnection> Connections {get;set;}publicint CurrentConnectionIndex {get;set;}publicList<SemaphoreSlim> ConnectionLockers {get;set;}publicList<EasyPool<ChannelItem>> ChannelPools {get;set;}}#region EasyPoolpublicsealedclassEasyPool<T>:IDisposablewhereT:class{privatereadonlyConcurrentBag<T> _pool;privatereadonlyFunc<T> _factory;privatereadonlyint _maxCount;publicEasyPool(Func<T> factory,int maxCount){
            _factory = factory;
            _maxCount = maxCount;
            _pool =newConcurrentBag<T>();}publicTGet(){if(!_pool.TryTake(outvar result))return_factory();return result;}publicboolReturn(T item){if(_pool.Count >= _maxCount){if(item isIDisposable disposable)try{ disposable.Dispose();}catch{}returnfalse;}
            _pool.Add(item);returntrue;}publicvoidDispose(){T result;while(_pool.TryTake(out result)){if(result isIDisposable disposable){try{ disposable.Dispose();}catch{}}}}}#endregionprivatereadonlyDictionary<string, HostItem> _cacheHosts =new();publicasyncTask<ChannelItem>GetChannelAsync(string key,Func<Task<IConnection>> connectionFactoty){var connectionCount =this.connectionCount;var maxChannelCountPerConnection =this.channelCountPerConnection;//获取 HostItemif(!_cacheHosts.TryGetValue(key,outvar cacheHost)){lock(_cacheHosts){if(!_cacheHosts.TryGetValue(key,out cacheHost)){
                    cacheHost =newHostItem{
                        HostLocker =new(1,1),
                        CurrentConnectionIndex =-1,
                        Connections =newList<IConnection>(connectionCount),
                        ConnectionLockers =newList<SemaphoreSlim>(connectionCount),
                        ChannelPools =newList<EasyPool<ChannelItem>>(connectionCount),};for(int i =0; i < connectionCount; i++){
                        cacheHost.Connections.Add(null);
                        cacheHost.ConnectionLockers.Add(new(1,1));var idx = i;
                        cacheHost.ChannelPools.Add(newEasyPool<ChannelItem>(()=>newChannelItem{
                            ConnectionIndex = idx,
                            RawChannel = cacheHost.Connections[idx].CreateModel(),
                            CacheHost = cacheHost
                        }, maxChannelCountPerConnection));}
                    _cacheHosts.Add(key, cacheHost);}}}//轮训得到连接索引await cacheHost.HostLocker.WaitAsync();int connectionIdx;try{
            connectionIdx =++cacheHost.CurrentConnectionIndex;if(connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;}finally{try{ cacheHost.HostLocker.Release();}catch{}}//检查是否初始化链接var conn = cacheHost.Connections[connectionIdx];if(conn ==null){var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];await connectionLocker.WaitAsync();try{
                conn = cacheHost.Connections[connectionIdx];if(conn ==null){
                    conn =awaitconnectionFactoty();
                    cacheHost.Connections[connectionIdx]= conn;}}finally{try{ connectionLocker.Release();}catch{}}}//得到 Channelreturn cacheHost.ChannelPools[connectionIdx].Get();}}#endregion
标签: rabbitmq 连接池

本文转载自: https://blog.csdn.net/u010476739/article/details/142643288
版权归原作者 jackletter 所有, 如有侵权,请联系我们删除。

“消息队列10:为RabbitMq添加连接池”的评论:

还没有评论