0


【RabbitMQ 项目】客户端:信道模块

文章目录

信道是提供服务的窗口,服务端的信道给用户提供服务

一.实现要点

服务接口返回和收到响应的同步

信道提供的服务接口主要任务就是,根据用户传入的参数,构建请求,然后发送,并等接收到响应后再返回。
关键 muduo 库中的接口都是非阻塞的,send 一调立马就返回,而我们是想接收到相应的响应后再返回,所以这里的同步需要我们自己实现。
我的做法是,信道内维护一个哈希表,key 是响应的 id,value 是响应。这个哈希表哪些地方会访问呢?第一,发送完请求后,会访问哈希表判断是否存在对应的响应(用请求的 id 来判断,因为请求和对应的响应,它们的 id 是相同的),如果存在了就直接返回响应,如果不能存在就去一个条件变量上等待。第二,收到来自服务端的推送消息的响应时,会将响应添加到哈希表,然后唤醒在条件变量上等待的线程

二.代码实现

#pragmaonce#include"Consumer.hpp"#include"muduo/net/TcpConnection.h"#include"muduo/protobuf/codec.h"#include"../common/ThreadPool.hpp"#include"../common/protocol.pb.h"#include"../common/Util.hpp"#include"../common/message.pb.h"#include<mutex>#include<condition_variable>namespace ns_channel
{classChannel;using ChannelPtr = std::shared_ptr<Channel>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using CommonResponsePtr = std::shared_ptr<ns_protocol::CommomResponse>;structChannel{
        std::string _id;
        std::unordered_map<std::string, ns_consumer::ConsumerPtr> _consumers;//<队列名,消费者>
        muduo::net::TcpConnectionPtr _connPtr;
        ProtobufCodecPtr _codecPtr;// 构建响应后要添加协议数据
        std::mutex _mtxForResps;
        std::mutex _mtxForConsumers;
        std::condition_variable _cond;
        std::unordered_map<std::string, CommonResponsePtr> _resps;Channel(const std::string &id,const muduo::net::TcpConnectionPtr &connPtr,const ProtobufCodecPtr &codecPtr):_id(id),_consumers(),_connPtr(connPtr),_codecPtr(codecPtr),_mtxForResps(),_mtxForConsumers(),_cond(),_resps(){}// 以下两个接口是为了给服务端发送请求,因为Connection模块不想再重复设计同步机制,所以发送响应的动作由信道模块完成boolopenChannel(){
            ns_protocol::OpenChannelRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"打开信道:channelId: "<< _id << endl;return commonRespPtr->ok();}boolcloseChannel(){
            ns_protocol::CloseChannelRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"关闭信道:channelId: "<< _id << endl;return commonRespPtr->ok();}/************
         * 以下用于处理生产客户端的请求
         * ***********/booldeclareExchange(const std::string &exchangeName, ns_protocol::ExchangeType type,bool isDurable){
            ns_protocol::DeclareExchangeRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_exchange_name(exchangeName);
            req.set_exchange_type(type);
            req.set_is_durable(isDurable);
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"声明交换机, exchangeName: "<< exchangeName << endl;return commonRespPtr->ok();}booldeleteExchange(const std::string &exchangeName){
            ns_protocol::DeleteExchangeRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"删除信道, exchangeName: "<< exchangeName << endl;return commonRespPtr->ok();}/*************
         * 声明队列
         * 记得要初始化队列消费者管理句柄
         * ***********/booldeclareMsgQueue(const std::string &qname,bool isDurable){
            ns_protocol::DeclareMsgQueueRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_queue_name(qname);
            req.set_is_durable(isDurable);
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"声明队列, queueName: "<< qname << endl;return commonRespPtr->ok();}/***************
         * 删除队列
         * 记得要删除队列关联的消费者
         * *************/booldeleteMsgQueue(const std::string &qname){
            ns_protocol::DeleteMsgQueueRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_queue_name(qname);
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"删除队列, queueName: "<< qname << endl;return commonRespPtr->ok();}/********** 
         * 绑定与解绑
         * ************/boolbind(const std::string &ename,const std::string &qname,const std::string &bindingKey){
            ns_protocol::BindRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_qname(qname);
            req.set_ename(ename);
            req.set_binding_key(bindingKey);
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"绑定: "<< ename <<"->"<< qname << endl;return commonRespPtr->ok();}boolunbind(const std::string &ename,const std::string &qname){
            ns_protocol::UnbindRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_qname(qname);
            req.set_ename(ename);
            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"解绑: "<< ename <<"->"<< qname << endl;return commonRespPtr->ok();}boolpublishMessage(const std::string &ename,const std::string &routingKey,
                            ns_data::DeliveryMode mode,const std::string &body){
            ns_protocol::PublishMessageRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_exchange_name(ename);

            ns_data::Message msg;
            msg.mutable_saved_info()->set_id(ns_util::UUIDUtil::uuid());
            msg.mutable_saved_info()->set_routing_key(routingKey);
            msg.mutable_saved_info()->set_delivery_mode(mode);
            msg.mutable_saved_info()->set_body(body);
            req.mutable_msg()->Swap(&msg);

            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"publish message: "<< body << endl;return commonRespPtr->ok();}/***********
         * 以下用于处理消费客户端请求
         * **************/boolsubscribeQueue(const std::string &qname,bool autoAck, ns_consumer::ConsumerCallback_t callback){

            std::unique_lock<std::mutex>lck(_mtxForConsumers);if(_consumers.count(qname)){LOG(INFO)<<"this queue has been subscribed, qname: "<< qname;returntrue;}

            ns_protocol::SubscribeQueueRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_qname(qname);
            req.set_consumer_id(ns_util::UUIDUtil::uuid());
            req.set_auto_ack(autoAck);

            _consumers[req.qname()]= std::make_shared<ns_consumer::Consumer>(req.consumer_id(),
                                                                                    qname, callback, autoAck);

            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"订阅队列"<<", qname: "<< qname << endl;return commonRespPtr->ok();}boolcancelSubscribe(const std::string &qname){

            std::unique_lock<std::mutex>lck(_mtxForConsumers);if(_consumers.count(qname)==0){returntrue;}auto consumerPtr = _consumers[qname];

            ns_protocol::CancelSubscribeRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_qname(qname);
            req.set_consumer_id(consumerPtr->_id);

            _codecPtr->send(_connPtr, req);

            _consumers.erase(qname);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"取消订阅队列"<<", qname: "<< qname << endl;return commonRespPtr->ok();}boolackMessage(const std::string &qname,const std::string &msgId){
            ns_protocol::AckRequest req;
            req.set_channel_id(_id);
            req.set_request_id(ns_util::UUIDUtil::uuid());
            req.set_qname(qname);
            req.set_msg_id(msgId);

            _codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"应答消息, msgId: "<< msgId << endl;return commonRespPtr->ok();}// 我们想要收到响应后这些给用户提供服务的接口才返回,所以需要同步策略voidputCommonResponse(const CommonResponsePtr &respPtr){
            std::unique_lock<std::mutex>lck(_mtxForResps);
            _resps[respPtr->response_id()]= respPtr;
            _cond.notify_all();}// 让消费者处理消息voidconsumeMessage(const std::string qname,const ns_data::Message &msg){{
                std::unique_lock<std::mutex>lck(_mtxForConsumers);if(_consumers.count(qname)==0){LOG(WARNING)<<"该消费者不存在"<< endl;return;}}

            _consumers[qname]->_callback(msg);}private:
        CommonResponsePtr waitCommonResponse(const std::string &reqId){
            std::unique_lock<std::mutex>lck(_mtxForResps);while(_resps.count(reqId)==0){
                _cond.wait(lck);}auto commonRespPtr = _resps[reqId];
            _resps.erase(reqId);return commonRespPtr;}};/******************************
     * 信道管理句柄,注意以Connection为单元进行管理
     * ***************************/classChannelManager{private:
        std::mutex _mtx;
        std::unordered_map<std::string, ChannelPtr> _channels;public:
        ChannelPtr openChannel(const muduo::net::TcpConnectionPtr &connPtr,const ProtobufCodecPtr &codecPtr){
            std::unique_lock<std::mutex>lck(_mtx);
            std::string channelId = ns_util::UUIDUtil::uuid();auto channelPtr = std::make_shared<Channel>(channelId, connPtr, codecPtr);
            _channels[channelId]= channelPtr;return channelPtr;}voidcloseChannel(const std::string &id){
            std::unique_lock<std::mutex>lck(_mtx);
            _channels.erase(id);}

        ChannelPtr getChannel(const std::string &id){
            std::unique_lock<std::mutex>lck(_mtx);if(_channels.count(id)==0){LOG(WARNING)<<"信道不存在, channelId: "<< id;returnnullptr;}return _channels[id];}};}
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/weixin_74113106/article/details/142577367
版权归原作者 月夜星辉雪 所有, 如有侵权,请联系我们删除。

“【RabbitMQ 项目】客户端:信道模块”的评论:

还没有评论