文章目录
信道是提供服务的窗口,服务端的信道给用户提供服务
一.实现要点
服务接口返回和收到响应的同步
信道提供的服务接口主要任务就是,根据用户传入的参数,构建请求,然后发送,并等接收到响应后再返回。
关键 muduo 库中的接口都是非阻塞的,send 一调立马就返回,而我们是想接收到相应的响应后再返回,所以这里的同步需要我们自己实现。
我的做法是,信道内维护一个哈希表,key 是响应的 id,value 是响应。这个哈希表哪些地方会访问呢?第一,发送完请求后,会访问哈希表判断是否存在对应的响应(用请求的 id 来判断,因为请求和对应的响应,它们的 id 是相同的),如果存在了就直接返回响应,如果不能存在就去一个条件变量上等待。第二,收到来自服务端的推送消息的响应时,会将响应添加到哈希表,然后唤醒在条件变量上等待的线程
二.代码实现
#pragmaonce#include"Consumer.hpp"#include"muduo/net/TcpConnection.h"#include"muduo/protobuf/codec.h"#include"../common/ThreadPool.hpp"#include"../common/protocol.pb.h"#include"../common/Util.hpp"#include"../common/message.pb.h"#include<mutex>#include<condition_variable>namespace ns_channel
{classChannel;using ChannelPtr = std::shared_ptr<Channel>;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using CommonResponsePtr = std::shared_ptr<ns_protocol::CommomResponse>;structChannel{
std::string _id;
std::unordered_map<std::string, ns_consumer::ConsumerPtr> _consumers;//<队列名,消费者>
muduo::net::TcpConnectionPtr _connPtr;
ProtobufCodecPtr _codecPtr;// 构建响应后要添加协议数据
std::mutex _mtxForResps;
std::mutex _mtxForConsumers;
std::condition_variable _cond;
std::unordered_map<std::string, CommonResponsePtr> _resps;Channel(const std::string &id,const muduo::net::TcpConnectionPtr &connPtr,const ProtobufCodecPtr &codecPtr):_id(id),_consumers(),_connPtr(connPtr),_codecPtr(codecPtr),_mtxForResps(),_mtxForConsumers(),_cond(),_resps(){}// 以下两个接口是为了给服务端发送请求,因为Connection模块不想再重复设计同步机制,所以发送响应的动作由信道模块完成boolopenChannel(){
ns_protocol::OpenChannelRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"打开信道:channelId: "<< _id << endl;return commonRespPtr->ok();}boolcloseChannel(){
ns_protocol::CloseChannelRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"关闭信道:channelId: "<< _id << endl;return commonRespPtr->ok();}/************
* 以下用于处理生产客户端的请求
* ***********/booldeclareExchange(const std::string &exchangeName, ns_protocol::ExchangeType type,bool isDurable){
ns_protocol::DeclareExchangeRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_exchange_name(exchangeName);
req.set_exchange_type(type);
req.set_is_durable(isDurable);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"声明交换机, exchangeName: "<< exchangeName << endl;return commonRespPtr->ok();}booldeleteExchange(const std::string &exchangeName){
ns_protocol::DeleteExchangeRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"删除信道, exchangeName: "<< exchangeName << endl;return commonRespPtr->ok();}/*************
* 声明队列
* 记得要初始化队列消费者管理句柄
* ***********/booldeclareMsgQueue(const std::string &qname,bool isDurable){
ns_protocol::DeclareMsgQueueRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_queue_name(qname);
req.set_is_durable(isDurable);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"声明队列, queueName: "<< qname << endl;return commonRespPtr->ok();}/***************
* 删除队列
* 记得要删除队列关联的消费者
* *************/booldeleteMsgQueue(const std::string &qname){
ns_protocol::DeleteMsgQueueRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_queue_name(qname);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"删除队列, queueName: "<< qname << endl;return commonRespPtr->ok();}/**********
* 绑定与解绑
* ************/boolbind(const std::string &ename,const std::string &qname,const std::string &bindingKey){
ns_protocol::BindRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_qname(qname);
req.set_ename(ename);
req.set_binding_key(bindingKey);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"绑定: "<< ename <<"->"<< qname << endl;return commonRespPtr->ok();}boolunbind(const std::string &ename,const std::string &qname){
ns_protocol::UnbindRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_qname(qname);
req.set_ename(ename);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"解绑: "<< ename <<"->"<< qname << endl;return commonRespPtr->ok();}boolpublishMessage(const std::string &ename,const std::string &routingKey,
ns_data::DeliveryMode mode,const std::string &body){
ns_protocol::PublishMessageRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_exchange_name(ename);
ns_data::Message msg;
msg.mutable_saved_info()->set_id(ns_util::UUIDUtil::uuid());
msg.mutable_saved_info()->set_routing_key(routingKey);
msg.mutable_saved_info()->set_delivery_mode(mode);
msg.mutable_saved_info()->set_body(body);
req.mutable_msg()->Swap(&msg);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"publish message: "<< body << endl;return commonRespPtr->ok();}/***********
* 以下用于处理消费客户端请求
* **************/boolsubscribeQueue(const std::string &qname,bool autoAck, ns_consumer::ConsumerCallback_t callback){
std::unique_lock<std::mutex>lck(_mtxForConsumers);if(_consumers.count(qname)){LOG(INFO)<<"this queue has been subscribed, qname: "<< qname;returntrue;}
ns_protocol::SubscribeQueueRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_qname(qname);
req.set_consumer_id(ns_util::UUIDUtil::uuid());
req.set_auto_ack(autoAck);
_consumers[req.qname()]= std::make_shared<ns_consumer::Consumer>(req.consumer_id(),
qname, callback, autoAck);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"订阅队列"<<", qname: "<< qname << endl;return commonRespPtr->ok();}boolcancelSubscribe(const std::string &qname){
std::unique_lock<std::mutex>lck(_mtxForConsumers);if(_consumers.count(qname)==0){returntrue;}auto consumerPtr = _consumers[qname];
ns_protocol::CancelSubscribeRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_qname(qname);
req.set_consumer_id(consumerPtr->_id);
_codecPtr->send(_connPtr, req);
_consumers.erase(qname);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"取消订阅队列"<<", qname: "<< qname << endl;return commonRespPtr->ok();}boolackMessage(const std::string &qname,const std::string &msgId){
ns_protocol::AckRequest req;
req.set_channel_id(_id);
req.set_request_id(ns_util::UUIDUtil::uuid());
req.set_qname(qname);
req.set_msg_id(msgId);
_codecPtr->send(_connPtr, req);auto commonRespPtr =waitCommonResponse(req.request_id());LOG(DEBUG)<<"应答消息, msgId: "<< msgId << endl;return commonRespPtr->ok();}// 我们想要收到响应后这些给用户提供服务的接口才返回,所以需要同步策略voidputCommonResponse(const CommonResponsePtr &respPtr){
std::unique_lock<std::mutex>lck(_mtxForResps);
_resps[respPtr->response_id()]= respPtr;
_cond.notify_all();}// 让消费者处理消息voidconsumeMessage(const std::string qname,const ns_data::Message &msg){{
std::unique_lock<std::mutex>lck(_mtxForConsumers);if(_consumers.count(qname)==0){LOG(WARNING)<<"该消费者不存在"<< endl;return;}}
_consumers[qname]->_callback(msg);}private:
CommonResponsePtr waitCommonResponse(const std::string &reqId){
std::unique_lock<std::mutex>lck(_mtxForResps);while(_resps.count(reqId)==0){
_cond.wait(lck);}auto commonRespPtr = _resps[reqId];
_resps.erase(reqId);return commonRespPtr;}};/******************************
* 信道管理句柄,注意以Connection为单元进行管理
* ***************************/classChannelManager{private:
std::mutex _mtx;
std::unordered_map<std::string, ChannelPtr> _channels;public:
ChannelPtr openChannel(const muduo::net::TcpConnectionPtr &connPtr,const ProtobufCodecPtr &codecPtr){
std::unique_lock<std::mutex>lck(_mtx);
std::string channelId = ns_util::UUIDUtil::uuid();auto channelPtr = std::make_shared<Channel>(channelId, connPtr, codecPtr);
_channels[channelId]= channelPtr;return channelPtr;}voidcloseChannel(const std::string &id){
std::unique_lock<std::mutex>lck(_mtx);
_channels.erase(id);}
ChannelPtr getChannel(const std::string &id){
std::unique_lock<std::mutex>lck(_mtx);if(_channels.count(id)==0){LOG(WARNING)<<"信道不存在, channelId: "<< id;returnnullptr;}return _channels[id];}};}
版权归原作者 月夜星辉雪 所有, 如有侵权,请联系我们删除。