环境:
- win11
- rabbitmq-3.8.17
- .net 6.0
- RabbitMQ.Client 6.8.1
- vs2022
安装RabbitMq环境参照:
- window下安装rabbitmq
- linux下安装rabbitmq
问题:rabbitmq的c#客户端没有自带连接池,所以需要手动实现。
简易实现如下:
usingRabbitMQ.Client;usingSystem.Collections.Concurrent;usingSystem.Text;//测试调用var channel =await ChannelPool.Default.GetChannelAsync("guid",()=>{var factory =newConnectionFactory(){
HostName ="localhost",
Port =5672,
UserName ="test",
Password ="123456",
VirtualHost ="/",};return Task.FromResult(factory.CreateConnection());});try{var body = Encoding.UTF8.GetBytes("{\"Name\":\"tom\"}");
channel.RawChannel.BasicPublish(exchange:"",routingKey:"test-queue",body: body);}finally{
channel.Return();}#region 连接池/// <summary>/// rabbitmq 本身没有提供链接池, 且 IModel 的建立和释放也需要发送请求, 所以建立 connection 轮训机制和 IModel 的缓冲池机制<br/>/// 参考: <seealso href="https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-and-channel-lifespan"/>/// </summary>publicclassChannelPool{publicstaticChannelPool Default =new(8,50);privateint connectionCount;privateint channelCountPerConnection;publicChannelPool(int connectionCount =1,int channelCountPerConnection =5){if(connectionCount >0)this.connectionCount = connectionCount;if(channelCountPerConnection >0)this.channelCountPerConnection = channelCountPerConnection;}publicclassChannelItem{publicint ConnectionIndex {get;set;}publicHostItem CacheHost {get;set;}publicIModel RawChannel {get;set;}publicvoidReturn()=> CacheHost.ChannelPools[ConnectionIndex].Return(this);}publicclassHostItem{publicSemaphoreSlim HostLocker {get;set;}publicList<IConnection> Connections {get;set;}publicint CurrentConnectionIndex {get;set;}publicList<SemaphoreSlim> ConnectionLockers {get;set;}publicList<EasyPool<ChannelItem>> ChannelPools {get;set;}}#region EasyPoolpublicsealedclassEasyPool<T>:IDisposablewhereT:class{privatereadonlyConcurrentBag<T> _pool;privatereadonlyFunc<T> _factory;privatereadonlyint _maxCount;publicEasyPool(Func<T> factory,int maxCount){
_factory = factory;
_maxCount = maxCount;
_pool =newConcurrentBag<T>();}publicTGet(){if(!_pool.TryTake(outvar result))return_factory();return result;}publicboolReturn(T item){if(_pool.Count >= _maxCount){if(item isIDisposable disposable)try{ disposable.Dispose();}catch{}returnfalse;}
_pool.Add(item);returntrue;}publicvoidDispose(){T result;while(_pool.TryTake(out result)){if(result isIDisposable disposable){try{ disposable.Dispose();}catch{}}}}}#endregionprivatereadonlyDictionary<string, HostItem> _cacheHosts =new();publicasyncTask<ChannelItem>GetChannelAsync(string key,Func<Task<IConnection>> connectionFactoty){var connectionCount =this.connectionCount;var maxChannelCountPerConnection =this.channelCountPerConnection;//获取 HostItemif(!_cacheHosts.TryGetValue(key,outvar cacheHost)){lock(_cacheHosts){if(!_cacheHosts.TryGetValue(key,out cacheHost)){
cacheHost =newHostItem{
HostLocker =new(1,1),
CurrentConnectionIndex =-1,
Connections =newList<IConnection>(connectionCount),
ConnectionLockers =newList<SemaphoreSlim>(connectionCount),
ChannelPools =newList<EasyPool<ChannelItem>>(connectionCount),};for(int i =0; i < connectionCount; i++){
cacheHost.Connections.Add(null);
cacheHost.ConnectionLockers.Add(new(1,1));var idx = i;
cacheHost.ChannelPools.Add(newEasyPool<ChannelItem>(()=>newChannelItem{
ConnectionIndex = idx,
RawChannel = cacheHost.Connections[idx].CreateModel(),
CacheHost = cacheHost
}, maxChannelCountPerConnection));}
_cacheHosts.Add(key, cacheHost);}}}//轮训得到连接索引await cacheHost.HostLocker.WaitAsync();int connectionIdx;try{
connectionIdx =++cacheHost.CurrentConnectionIndex;if(connectionIdx >= connectionCount) cacheHost.CurrentConnectionIndex = connectionIdx = connectionIdx % connectionCount;}finally{try{ cacheHost.HostLocker.Release();}catch{}}//检查是否初始化链接var conn = cacheHost.Connections[connectionIdx];if(conn ==null){var connectionLocker = cacheHost.ConnectionLockers[connectionIdx];await connectionLocker.WaitAsync();try{
conn = cacheHost.Connections[connectionIdx];if(conn ==null){
conn =awaitconnectionFactoty();
cacheHost.Connections[connectionIdx]= conn;}}finally{try{ connectionLocker.Release();}catch{}}}//得到 Channelreturn cacheHost.ChannelPools[connectionIdx].Get();}}#endregion
本文转载自: https://blog.csdn.net/u010476739/article/details/142643288
版权归原作者 jackletter 所有, 如有侵权,请联系我们删除。
版权归原作者 jackletter 所有, 如有侵权,请联系我们删除。