0


【RabbitMQ 项目】服务端:服务器模块

文章目录

一.编写思路

成员变量:

  1. muduo 库中的 TCP 服务器
  2. EventLoop 对象:用于主线程循环监控连接事件
  3. 协议处理句柄
  4. 分发器:用于初始化协议处理器,便于把不同请求派发给不同的业务处理函数
  5. 连接管理句柄
  6. 虚拟机句柄
  7. 消费者管理句柄
  8. 线程池管理句柄成员方法: 向外提供的只有 2 个方法:
  9. start:启动服务
  10. 构造函数:
  • 完成各项成员的初始化,
  • 注册 TCP 服务器的两个回调函数: OnMessage:从接收缓冲区把数据读到用户缓冲区后的回调函数 OnConnection:Tcp 连接建立或断开时的回调函数
  • 给分发器注册业务处理函数(私有成员方法,共 12 个) 信道打开与与关闭;交换机,队列,绑定添加与删除,订阅与取消订阅,发布与确认消息私有成员(业务处理函数): 如果是创建或关闭信道,直接用连接管理句柄新增或删除信道,然后构建响应返回 如果是其他请求,先用连接管理句柄找到信道(请求中携带了信道 id),再使用信道提供的服务

二.代码实践

BrokerServer.hpp:

#pragmaonce#include"muduo/protobuf/codec.h"#include"muduo/protobuf/dispatcher.h"#include"muduo/base/Logging.h"#include"muduo/base/Mutex.h"#include"muduo/net/EventLoop.h"#include"muduo/net/TcpServer.h"#include"VirtualHost.hpp"#include"Connection.hpp"#include"Consumer.hpp"#include<functional>#include<stdio.h>#include<unistd.h>namespace ns_server
{using ConnectionManagerPtr = std::shared_ptr<ns_connection::ConnectionManager>;using VirtualHostPtr = std::shared_ptr<ns_data::VirtualHost>;using ConsumerManagerPtr = std::shared_ptr<ns_consumer::ConsumerManager>;using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;/************
     * 定义协议的结构化数据的智能指针(在分发器中注册时需要的格式)
     * *************/using OpenChannelRequestPtr = std::shared_ptr<ns_protocol::OpenChannelRequest>;using CloseChannelRequestPtr = std::shared_ptr<ns_protocol::CloseChannelRequest>;using DeclareExchangeRequestPtr = std::shared_ptr<ns_protocol::DeclareExchangeRequest>;using DeleteExchangeRequestPtr = std::shared_ptr<ns_protocol::DeleteExchangeRequest>;using DeclareMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeclareMsgQueueRequest>;using DeleteMsgQueueRequestPtr = std::shared_ptr<ns_protocol::DeleteMsgQueueRequest>;using BindRequestPtr = std::shared_ptr<ns_protocol::BindRequest>;using UnbindRequestPtr = std::shared_ptr<ns_protocol::UnbindRequest>;using PublishMessageRequestPtr = std::shared_ptr<ns_protocol::PublishMessageRequest>;using SubscribeQueueRequestPtr = std::shared_ptr<ns_protocol::SubscribeQueueRequest>;using CancelSubscribeRequestPtr = std::shared_ptr<ns_protocol::CancelSubscribeRequest>;using AckRequestPtr = std::shared_ptr<ns_protocol::AckRequest>;classBrokerServer{public:private:
        muduo::net::EventLoop _baseLoop;
        muduo::net::TcpServer _server;
        ProtobufDispatcher _dispatcher;
        ProtobufCodecPtr _codecPtr;
        VirtualHostPtr _vhPtr;
        ConsumerManagerPtr _consumerManagerPtr;
        ConnectionManagerPtr _connManagerPtr;
        ThreadPoolPtr _threadPoolPtr;public:BrokerServer(int serverPort,const std::string &dbName,const std::string &msgDir):_baseLoop(),_server(&_baseLoop, muduo::net::InetAddress("0.0.0.0", serverPort),"TcpServer", muduo::net::TcpServer::kReusePort),_dispatcher(std::bind(&BrokerServer::onUnknownMessage,this, std::placeholders::_1,
                                    std::placeholders::_2, std::placeholders::_3)){// 初始化成员
            _codecPtr = std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher, std::placeholders::_1,
                                                                  std::placeholders::_2, std::placeholders::_3));
            _vhPtr = std::make_shared<ns_data::VirtualHost>(dbName, msgDir);

            _threadPoolPtr = std::make_shared<ns_tp::ThreadPool>();
            _threadPoolPtr->start();

            std::vector<std::string> qnames;
            _vhPtr->getAllQueueName(&qnames);
            _consumerManagerPtr = std::make_shared<ns_consumer::ConsumerManager>(qnames);

            _connManagerPtr = std::make_shared<ns_connection::ConnectionManager>();// 给_server注册两个回调函数
            _server.setConnectionCallback(std::bind(&BrokerServer::onConnection,this, std::placeholders::_1));
            _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codecPtr.get(), std::placeholders::_1,
                                                 std::placeholders::_2, std::placeholders::_3));// 给分发器注册业务处理函数
            _dispatcher.registerMessageCallback<ns_protocol::OpenChannelRequest>(std::bind(&BrokerServer::onOpenChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::CloseChannelRequest>(std::bind(&BrokerServer::onCloseChannel,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::DeclareExchangeRequest>(std::bind(&BrokerServer::onDeclareExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::DeleteExchangeRequest>(std::bind(&BrokerServer::onDeleteExchange,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::DeclareMsgQueueRequest>(std::bind(&BrokerServer::onDeclareMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::DeleteMsgQueueRequest>(std::bind(&BrokerServer::onDeleteMsgQueue,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::BindRequest>(std::bind(&BrokerServer::onBind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::UnbindRequest>(std::bind(&BrokerServer::onUnbind,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::SubscribeQueueRequest>(std::bind(&BrokerServer::onSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::CancelSubscribeRequest>(std::bind(&BrokerServer::onCancelSubScribe,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::PublishMessageRequest>(std::bind(&BrokerServer::onPublishMessage,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<ns_protocol::AckRequest>(std::bind(&BrokerServer::onAck,this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));}voidstart(){// 开启监听状态
            _server.start();// 开始循环监控事件
            _baseLoop.loop();}private:// 给TcpServer设置的回调函数voidonConnection(const muduo::net::TcpConnectionPtr &connPtr){if(connPtr->connected()){
                _connManagerPtr->newConnection(connPtr, _codecPtr, _vhPtr, _consumerManagerPtr, _threadPoolPtr);}else{
                _connManagerPtr->deleteConnection(connPtr);}}// 业务处理函数voidonUnknownMessage(const muduo::net::TcpConnectionPtr &connPtr, MessagePtr msgPtr, muduo::Timestamp time){
            cout <<"未知消息"<< endl;
            connPtr->shutdown();}/************
         * 信道创建与删除
         * ***************/voidonOpenChannel(const muduo::net::TcpConnectionPtr &connPtr,const OpenChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"打开信道时, 未找到Connection"<< endl;return;}
            myConnPtr->openChannel(*reqPtr);LOG(DEBUG)<<"create new channel, channelId: "<< reqPtr->channel_id()<< endl;}voidonCloseChannel(const muduo::net::TcpConnectionPtr &connPtr,const CloseChannelRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"关闭信道时, 未找到Connection"<< endl;return;}
            myConnPtr->closeChannel(*reqPtr);LOG(DEBUG)<<"close channel, channelId: "<< reqPtr->channel_id()<< endl;}/*********
         * 交换机声明与删除
         * ********/voidonDeclareExchange(const muduo::net::TcpConnectionPtr &connPtr,const DeclareExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"声明交换机时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->declareExchange(*reqPtr);LOG(DEBUG)<<"声明交换机, exchangeName: "<< reqPtr->exchange_name()<< endl;}voidonDeleteExchange(const muduo::net::TcpConnectionPtr &connPtr,const DeleteExchangeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"删除交换机时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->deleteExchange(*reqPtr);LOG(DEBUG)<<"删除信道, exchangeName: "<< reqPtr->exchange_name()<< endl;}/************
         * 队列声明与删除
         * ***************/voidonDeclareMsgQueue(const muduo::net::TcpConnectionPtr &connPtr,const DeclareMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"声明队列时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->declareMsgQueue(*reqPtr);LOG(DEBUG)<<"声明队列, queueName: "<< reqPtr->queue_name()<< endl;}voidonDeleteMsgQueue(const muduo::net::TcpConnectionPtr &connPtr,const DeleteMsgQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"删除队列时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->deleteMsgQueue(*reqPtr);LOG(DEBUG)<<"删除队列, queueName: "<< reqPtr->queue_name()<< endl;}/**********
         * 绑定与解绑
         * ***********/voidonBind(const muduo::net::TcpConnectionPtr &connPtr,const BindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"添加绑定时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->bind(*reqPtr);LOG(DEBUG)<<"绑定: "<< reqPtr->ename()<<"->"<< reqPtr->qname()<<": "<< reqPtr->binding_key()<< endl;}voidonUnbind(const muduo::net::TcpConnectionPtr &connPtr,const UnbindRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"删除绑定时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->unbind(*reqPtr);LOG(DEBUG)<<"解绑: "<< reqPtr->ename()<<"->"<< reqPtr->qname()<< endl;}/*************
         * 订阅与取消订阅
         * ************/voidonSubScribe(const muduo::net::TcpConnectionPtr &connPtr,const SubscribeQueueRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"订阅队列时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->subscribeQueue(*reqPtr);LOG(DEBUG)<<"订阅队列"<<", qname: "<< reqPtr->qname()<< endl;}voidonCancelSubScribe(const muduo::net::TcpConnectionPtr &connPtr,const CancelSubscribeRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"取消订阅队列时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->cancelSubscribe(*reqPtr);LOG(DEBUG)<<"取消订阅队列"<<", qname: "<< reqPtr->qname()<< endl;}/********
         * 发布与应答
         * **************/voidonPublishMessage(const muduo::net::TcpConnectionPtr &connPtr,const PublishMessageRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"发布消息时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->publishMessage(*reqPtr);LOG(DEBUG)<<"publish message: "<< reqPtr->msg().saved_info().body()<< endl;}voidonAck(const muduo::net::TcpConnectionPtr &connPtr,const AckRequestPtr &reqPtr, muduo::Timestamp time){auto myConnPtr = _connManagerPtr->getConnection(connPtr);if(myConnPtr ==nullptr){LOG(WARNING)<<"确认消息时, 未找到Connection"<< endl;return;}auto channelPtr = myConnPtr->getChannel(reqPtr->channel_id());if(channelPtr ==nullptr){LOG(WARNING)<<"没有找到信道"<< endl;return;}

            channelPtr->ackMessage(*reqPtr);LOG(DEBUG)<<"应答消息, msgId: "<< reqPtr->msg_id()<< endl;}voidsendCommonResponse(const muduo::net::TcpConnectionPtr &connPtr,const std::string &channelId,const std::string &responseId,bool ok){
            ns_protocol::CommomResponse resp;
            resp.set_channel_id(channelId);
            resp.set_response_id(responseId);
            resp.set_ok(ok);
            _codecPtr->send(connPtr, resp);}};}

三.服务端模块关系总结

在这里插入图片描述


本文转载自: https://blog.csdn.net/weixin_74113106/article/details/142577303
版权归原作者 月夜星辉雪 所有, 如有侵权,请联系我们删除。

“【RabbitMQ 项目】服务端:服务器模块”的评论:

还没有评论