0


【RabbitMQ 项目】客户端:连接模块

文章目录

客户端有两种,生产客户端,消费客户端,其实连接模块就是传统意义上的的客户端,生产客户端,消费客户端都是用它来搭建的,只不过连接模块提供了很多接口。消费客户端只会使用其中的一部分,比如消费客户端会发布消息;生产客户端也只使用一部分,比如不可能订阅消息

一.实现要点

构造函数

muduo 库中的 TcpClient 成员的注册两个回调函数,一个是从接收缓冲区读取上来消息后的回调函数,注册为协议处理器的 onMessage 成员,另一个是连接建立成功后的回调,我们自己实现一个,工作就是给 Connection 内部维护的 TCP 连接赋值为这个新建的连接
此外,为分发器注册业务处理函数,即对不同响应的处理动作。第一,收到一个 CommonResponse,结合我们上节介绍的信道模块,则需要向信道的哈希表中 put 该 Response;第二,收到一个 PushMessageResponse,我们需要找到信道,调用信道的提供的处理消息的函数(工作是先找到消费者,然后调用消费者的回调函数)。但这个任务属于支线任务,Reactor 线程不想自己做,于是交给线程池来做
最后,调用 TcpClient 的 connect 接口,向服务端发起连接

二.代码实践

  1. #pragmaonce#include"muduo/protobuf/codec.h"#include"muduo/protobuf/dispatcher.h"#include"muduo/base/Logging.h"#include"muduo/base/Mutex.h"#include"muduo/net/EventLoop.h"#include"muduo/net/TcpClient.h"#include"muduo/net/EventLoopThread.h"#include"muduo/base/CountDownLatch.h"#include"../common/ThreadPool.hpp"#include"Channel.hpp"#include<functional>#include<iostream>namespace ns_connection
  2. {using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using ProtobufDispatcherPtr = std::shared_ptr<ProtobufDispatcher>;using ChannelPtr = std::shared_ptr<ns_channel::Channel>;using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;using CommonResponsePtr = std::shared_ptr<ns_protocol::CommomResponse>;using PushMessageResonsePtr = std::shared_ptr<ns_protocol::PushMessageResponse>;/***********
  3. * Connection是对底层用于通信的TCP套接字封装(muduo库中的TcpConnectionPtr)
  4. * 一个Connection中包含多个信道,当Connection关闭,信道也会销毁
  5. * ******************/classConnection{private:
  6. muduo::net::EventLoopThread _loopThread;
  7. muduo::CountDownLatch _latch;
  8. muduo::net::TcpClient _client;
  9. muduo::net::TcpConnectionPtr _connPtr;
  10. ProtobufDispatcherPtr _distpatcherPtr;
  11. ProtobufCodecPtr _codecPtr;
  12. ns_channel::ChannelManager _channelManager;
  13. ThreadPoolPtr _threadPoolPtr;public:Connection(const std::string &serverIp,int serverPort,const ThreadPoolPtr &threadPoolPtr):_loopThread(),_latch(1),_client(_loopThread.startLoop(), muduo::net::InetAddress(serverIp, serverPort),"client"),_connPtr(),_channelManager(),_threadPoolPtr(threadPoolPtr){// 构造成员
  14. _distpatcherPtr = std::make_shared<ProtobufDispatcher>((std::bind(&Connection::onUnknownMessage,this,
  15. std::placeholders::_1,
  16. std::placeholders::_2,
  17. std::placeholders::_3)));
  18. _codecPtr = std::make_shared<ProtobufCodec>((std::bind(&ProtobufDispatcher::onProtobufMessage,
  19. _distpatcherPtr.get(),
  20. std::placeholders::_1,
  21. std::placeholders::_2,
  22. std::placeholders::_3)));// 给Client注册两个回调函数
  23. _client.setConnectionCallback(std::bind(&Connection::onConnection,this, std::placeholders::_1));
  24. _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1,
  25. std::placeholders::_2, std::placeholders::_3));
  26. _distpatcherPtr->registerMessageCallback<ns_protocol::CommomResponse>(std::bind(&Connection::onCommonResponse,this, std::placeholders::_1,
  27. std::placeholders::_2,
  28. std::placeholders::_3));
  29. _distpatcherPtr->registerMessageCallback<ns_protocol::PushMessageResponse>(std::bind(&Connection::onRecvMessage,this, std::placeholders::_1,
  30. std::placeholders::_2,
  31. std::placeholders::_3));connect();}voidconnect(){
  32. _client.connect();// 非阻塞
  33. _latch.wait();}
  34. ChannelPtr openChannel(){// 只是在本地建立了信道auto channelPtr = _channelManager.openChannel(_connPtr, _codecPtr);// 通过该信道发送建立信道的请求,要服务端也建立对应的信道if(!channelPtr->openChannel()){LOG(WARNING)<<"打开信道失败"<< endl;// 关闭本地的信道,防止内存泄漏
  35. _channelManager.closeChannel(channelPtr->_id);}return channelPtr;}voidcloseChannel(const ChannelPtr& channelPtr){// 发送关闭信道的请求,让服务端关闭信道
  36. channelPtr->closeChannel();// 把本地信道关掉
  37. _channelManager.closeChannel(channelPtr->_id);}private:// 给_client设置的回调voidonConnection(muduo::net::TcpConnectionPtr connPtr){if(connPtr->connected()){
  38. _connPtr = connPtr;
  39. _latch.countDown();}else{
  40. _connPtr.reset();}}voidonUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr,const MessagePtr &resp, muduo::Timestamp time){LOG(WARNING)<<"未知响应"<< endl;}// 业务处理函数voidonCommonResponse(const muduo::net::TcpConnectionPtr &connPtr,const CommonResponsePtr &respPtr, muduo::Timestamp time){//LOG(DEBUG) << "收到CommonResponse, respId: " << respPtr->response_id() << endl;
  41. std::string channeId = respPtr->channel_id();auto channelPtr = _channelManager.getChannel(channeId);
  42. channelPtr->putCommonResponse(respPtr);}voidonRecvMessage(const muduo::net::TcpConnectionPtr &connPtr,const PushMessageResonsePtr &respPtr, muduo::Timestamp time){//LOG(DEBUG) << "收到消息, body: " << respPtr->msg().saved_info().body() << endl;
  43. std::string channeId = respPtr->channel_id();auto channelPtr = _channelManager.getChannel(channeId);// 把处理消息的任务交给线程池来做
  44. _threadPoolPtr->push(std::bind(&ns_channel::Channel::consumeMessage, channelPtr.get(),
  45. respPtr->qname(), respPtr->msg()));}};}

三.搭建消费客户端和生产客户端

思路:先创建 Connection 对象,然后用 connection 创建信道,即可使用信道提供的服务

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_74113106/article/details/142577400
版权归原作者 月夜星辉雪 所有, 如有侵权,请联系我们删除。

“【RabbitMQ 项目】客户端:连接模块”的评论:

还没有评论