0


仿RabbitMQ实现消息队列客户端

文章目录

客⼾端模块实现

在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中,弱化了Client客⼾端的概念,也就是说在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来,⽽是以⼀种提供服务的形式来体现。其实现思想类似于普通的功能接⼝封装,⼀个接⼝实现⼀个功能,接⼝内部完成向客⼾端请求的过程,但是对外并不需要体现出客⼾端与服务端通信的概念,⽤⼾需要什么服务就调⽤什么接⼝就⾏。

基于以上的思想,客⼾端的实现共分为四⼤模块:

在这里插入图片描述
基于以上模块,实现⼀个客⼾端的流程也就⽐较简单了

  1. 实例化异步线程对象
  2. 实例化连接对象
  3. 通过连接对象,创建信道
  4. 根据信道获取⾃⼰所需服务
  5. 关闭信道
  6. 关闭连接

订阅者模块

与服务端,并⽆太⼤差别,客⼾端这边虽然订阅者的存在感微弱了很多,但是还是有的,当进⾏队列消息订阅的时候,会伴随着⼀个订阅者对象的创建,⽽这个订阅者对象有以下⼏个作⽤:

  • 描述当前信道订阅了哪个队列的消息。
  • 描述了收到消息后该如何对这条消息进⾏处理
  • 描述收到消息后是否需要进⾏确认回复在这里插入图片描述
  1. 订阅者信息:
  • 订阅者标识
  • 订阅队列名
  • 是否⾃动确认标志
  • 回调处理函数(收到消息后该如何处理的回调函数对象)
using ConsumerCallback = std::function<void(const std::string,const BasicProperties *bp,const std::string)>;structConsumer{using ptr = std::shared_ptr<Consumer>;
        std::string tag;//消费者标识
        std::string qname;//消费者订阅的队列名称bool auto_ack;//自动确认标志
        ConsumerCallback callback;Consumer(){DLOG("new Consumer: %p",this);}Consumer(const std::string &ctag,const std::string &queue_name,bool ack_flag,const ConsumerCallback &cb):tag(ctag),qname(queue_name),auto_ack(ack_flag),callback(std::move(cb)){DLOG("new Consumer: %p",this);}~Consumer(){DLOG("del Consumer: %p",this);}};

信道管理模块

同样的,客⼾端也有信道,其功能与服务端⼏乎⼀致,或者说不管是客⼾端的channel还是服务端的channel都是为了⽤⼾提供具体服务⽽存在的,只不过服务端是为客⼾端的对应请求提供服务,⽽客⼾端的接⼝服务是为了⽤⼾具体需要服务,也可以理解是⽤⼾通过客⼾端channel的接⼝调⽤来向服务端发送对应请求,获取请求的服务。
在这里插入图片描述

  1. 信道信息:
  • a. 信道ID
  • b. 信道关联的⽹络通信连接对象
  • c. protobuf协议处理对象
  • d. 信道关联的消费者
  • e. 请求对应的响应信息队列(这⾥队列使⽤<请求ID,响应>hash表,以便于查找指定的响应)
  • f. 互斥锁&条件变量(⼤部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们⾃⼰在收到响应后,通过判断是否是等待的指定响应来进⾏同步)
  1. 信道操作:
  • a. 提供创建信道操作
  • b. 提供删除信道操作
  • c. 提供声明交换机操作(强断⾔-有则OK,没有则创建)
  • d. 提供删除交换机
  • e. 提供创建队列操作(强断⾔-有则OK,没有则创建)
  • f. 提供删除队列操作
  • g. 提供交换机-队列绑定操作
  • h. 提供交换机-队列解除绑定操作
  • i . 提供添加订阅操作
  • j. 提供取消订阅操作
  • k. 提供发布消息操作
  • l. 提供确认消息操作
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;classChannel{public:using ptr = std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec):_cid(UUIDHelper::uuid()),_conn(conn),_codec(codec){}~Channel(){basicCancel();}
            std::string cid(){return _cid;}boolopenChannel(){
                std::string rid =UUIDHelper::uuid();
                openChannelRequest req; 
                req.set_rid(rid);
                req.set_cid(_cid);
                _codec->send(_conn, req);
                basicCommonResponsePtr resp =waitResponse(rid);return resp->ok();}voidcloseChannel(){
                std::string rid =UUIDHelper::uuid();
                closeChannelRequest req; 
                req.set_rid(rid);
                req.set_cid(_cid);
                _codec->send(_conn, req);waitResponse(rid);return;}booldeclareExchange(const std::string &name,
                ExchangeType type,bool durable,bool auto_delete,
                google::protobuf::Map<std::string, std::string>&args){//构造一个声明虚拟机的请求对象,
                std::string rid =UUIDHelper::uuid();
                declareExchangeRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_exchange_name(name);
                req.set_exchange_type(type);
                req.set_durable(durable);
                req.set_auto_delete(auto_delete);
                req.mutable_args()->swap(args);//然后向服务器发送请求
                _codec->send(_conn, req);//等待服务器的响应
                basicCommonResponsePtr resp =waitResponse(rid);//返回return resp->ok();}voiddeleteExchange(const std::string &name){
                std::string rid =UUIDHelper::uuid();
                deleteExchangeRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_exchange_name(name);
                _codec->send(_conn, req);waitResponse(rid);return;}booldeclareQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,
                google::protobuf::Map<std::string, std::string>&qargs){
                std::string rid =UUIDHelper::uuid();
                declareQueueRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_queue_name(qname);
                req.set_durable(qdurable);
                req.set_auto_delete(qauto_delete);
                req.set_exclusive(qexclusive);
                req.mutable_args()->swap(qargs);
                _codec->send(_conn, req);
                basicCommonResponsePtr resp =waitResponse(rid);return resp->ok();}voiddeleteQueue(const std::string &qname){
                std::string rid =UUIDHelper::uuid();
                deleteQueueRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_queue_name(qname);
                _codec->send(_conn, req);waitResponse(rid);return;}boolqueueBind(const std::string &ename,const std::string &qname,const std::string &key){
                std::string rid =UUIDHelper::uuid();
                queueBindRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_exchange_name(ename);
                req.set_queue_name(qname);
                req.set_binding_key(key);
                _codec->send(_conn, req);
                basicCommonResponsePtr resp =waitResponse(rid);return resp->ok();}voidqueueUnBind(const std::string &ename,const std::string &qname){
                std::string rid =UUIDHelper::uuid();
                queueUnBindRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_exchange_name(ename);
                req.set_queue_name(qname);
                _codec->send(_conn, req);waitResponse(rid);return;}voidbasicPublish(const std::string &ename,const BasicProperties *bp,const std::string &body){
                std::string rid =UUIDHelper::uuid();
                basicPublishRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_body(body);
                req.set_exchange_name(ename);if(bp !=nullptr){
                    req.mutable_properties()->set_id(bp->id());
                    req.mutable_properties()->set_delivery_mode(bp->delivery_mode());
                    req.mutable_properties()->set_routing_key(bp->routing_key());}
                _codec->send(_conn, req);waitResponse(rid);return;}voidbasicAck(const std::string &msgid){if(_consumer.get()==nullptr){DLOG("消息确认时,找不到消费者信息!");return;}
                std::string rid =UUIDHelper::uuid();
                basicAckRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_queue_name(_consumer->qname);
                req.set_message_id(msgid);
                _codec->send(_conn, req);waitResponse(rid);return;}voidbasicCancel(){if(_consumer.get()==nullptr){return;}
                std::string rid =UUIDHelper::uuid();
                basicCancelRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_queue_name(_consumer->qname);
                req.set_consumer_tag(_consumer->tag);
                _codec->send(_conn, req);waitResponse(rid);
                _consumer.reset();return;}boolbasicConsume(const std::string &consumer_tag,const std::string &queue_name,bool auto_ack,const ConsumerCallback &cb){if(_consumer.get()!=nullptr){DLOG("当前信道已订阅其他队列消息!");returnfalse;}
                std::string rid =UUIDHelper::uuid();
                basicConsumeRequest req;
                req.set_rid(rid);
                req.set_cid(_cid);
                req.set_queue_name(queue_name);
                req.set_consumer_tag(consumer_tag);
                req.set_auto_ack(auto_ack);
                _codec->send(_conn, req);
                basicCommonResponsePtr resp =waitResponse(rid);if(resp->ok()==false){DLOG("添加订阅失败!");returnfalse;}
                _consumer = std::make_shared<Consumer>(consumer_tag, queue_name, auto_ack, cb);returntrue;}public://连接收到基础响应后,向hash_map中添加响应voidputBasicResponse(const basicCommonResponsePtr& resp){
                std::unique_lock<std::mutex>lock(_mutex);
                _basic_resp.insert(std::make_pair(resp->rid(), resp));
                _cv.notify_all();}//连接收到消息推送后,需要通过信道找到对应的消费者对象,通过回调函数进行消息处理voidconsume(const basicConsumeResponsePtr& resp){if(_consumer.get()==nullptr){DLOG("消息处理时,未找到订阅者信息!");return;}if(_consumer->tag != resp->consumer_tag()){DLOG("收到的推送消息中的消费者标识,与当前信道消费者标识不一致!");return;}
                _consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());}private:
            basicCommonResponsePtr waitResponse(const std::string &rid){
                std::unique_lock<std::mutex>lock(_mutex);
                _cv.wait(lock,[&rid,this](){return _basic_resp.find(rid)!= _basic_resp.end();});//while(condition()) _cv.wait();
                basicCommonResponsePtr basic_resp = _basic_resp[rid];
                _basic_resp.erase(rid);return basic_resp;}private:
            std::string _cid;
            muduo::net::TcpConnectionPtr _conn;
            ProtobufCodecPtr _codec;
            Consumer::ptr _consumer;
            std::mutex _mutex;
            std::condition_variable _cv;
            std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp;};
  1. 信道管理:
  • a. 创建信道
  • b. 查询信道
  • c. 删除信道
classChannelManager{public:using ptr = std::shared_ptr<ChannelManager>;ChannelManager(){}
            Channel::ptr create(const muduo::net::TcpConnectionPtr &conn,const ProtobufCodecPtr &codec){
                std::unique_lock<std::mutex>lock(_mutex);auto channel = std::make_shared<Channel>(conn, codec);
                _channels.insert(std::make_pair(channel->cid(), channel));return channel;}voidremove(const std::string &cid){
                std::unique_lock<std::mutex>lock(_mutex);
                _channels.erase(cid);}
            Channel::ptr get(const std::string &cid){
                std::unique_lock<std::mutex>lock(_mutex);auto it = _channels.find(cid);if(it == _channels.end()){returnChannel::ptr();}return it->second;}private:
            std::mutex _mutex;
            std::unordered_map<std::string, Channel::ptr> _channels;};

异步⼯作线程实现

客⼾端这边存在两个异步⼯作线程,

  • ⼀个是muduo库中客⼾端连接的异步循环线程EventLoopThread
  • ⼀个是当收到消息后进⾏异步处理的⼯作线程池。

这两项都不是以连接为单元进⾏创建的,⽽是创建后,可以⽤以多个连接中,因此单独进⾏封装。

classAsyncWorker{public:using ptr = std::shared_ptr<AsyncWorker>;
            muduo::net::EventLoopThread loopthread;
            threadpool pool;};

连接管理模块

在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。

这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。

在这里插入图片描述

classConnection{public:using ptr = std::shared_ptr<Connection>;Connection(const std::string &sip,int sport,const AsyncWorker::ptr &worker):_latch(1),_client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport),"Client"),_dispatcher(std::bind(&Connection::onUnknownMessage,this, std::placeholders::_1,
                                    std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage,&_dispatcher,
                                                               std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){

            _dispatcher.registerMessageCallback<basicCommonResponse>(std::bind(&Connection::basicResponse,this,
                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::consumeResponse,this,
                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));

            _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),
                std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            _client.setConnectionCallback(std::bind(&Connection::onConnection,this, std::placeholders::_1));

            _client.connect();
            _latch.wait();// 阻塞等待,直到连接建立成功}
        Channel::ptr openChannel(){
            Channel::ptr channel = _channel_manager->create(_conn, _codec);bool ret = channel->openChannel();if(ret ==false){DLOG("打开信道失败!");returnChannel::ptr();}return channel;}voidcloseChannel(const Channel::ptr &channel){
            channel->closeChannel();
            _channel_manager->remove(channel->cid());}private:voidbasicResponse(const muduo::net::TcpConnectionPtr &conn,const basicCommonResponsePtr &message, muduo::Timestamp){// 1. 找到信道
            Channel::ptr channel = _channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道信息!");return;}// 2. 将得到的响应对象,添加到信道的基础响应hash_map中
            channel->putBasicResponse(message);}voidconsumeResponse(const muduo::net::TcpConnectionPtr &conn,const basicConsumeResponsePtr &message, muduo::Timestamp){// 1. 找到信道
            Channel::ptr channel = _channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道信息!");return;}// 2. 封装异步任务(消息处理任务),抛入线程池
            _worker->pool.push([channel, message](){ channel->consume(message);});}voidonUnknownMessage(const muduo::net::TcpConnectionPtr &conn,const MessagePtr &message, muduo::Timestamp){
            LOG_INFO <<"onUnknownMessage: "<< message->GetTypeName();
            conn->shutdown();}voidonConnection(const muduo::net::TcpConnectionPtr &conn){if(conn->connected()){
                _latch.countDown();// 唤醒主线程中的阻塞
                _conn = conn;}else{// 连接关闭时的操作
                _conn.reset();}}private:
        muduo::CountDownLatch _latch;// 实现同步的
        muduo::net::TcpConnectionPtr _conn;// 客户端对应的连接
        muduo::net::TcpClient _client;// 客户端
        ProtobufDispatcher _dispatcher;// 请求分发器
        ProtobufCodecPtr _codec;// 协议处理器

        AsyncWorker::ptr _worker;
        ChannelManager::ptr _channel_manager;};

生产者客户端

publish_client.cc

  1. 实例化异步工作线程对象
  2. 实例化连接对象
  3. 通过连接创建信道
  4. 通过信道提供的服务完成所需
  5. 循环向交换机发布消息
  6. 关闭信道
#include"connection.hpp"intmain(){//1. 实例化异步工作线程对象
    nzq::AsyncWorker::ptr awp = std::make_shared<nzq::AsyncWorker>();//2. 实例化连接对象
    nzq::Connection::ptr conn = std::make_shared<nzq::Connection>("127.0.0.1",8085,awp);//3. 通过连接创建信道
    nzq::Channel::ptr channel = conn->openChannel();//4. 通过信道提供的服务完成所需//  1. 声明一个交换机exchange1, 交换机类型为广播模式
    google::protobuf::Map<std::string,std::string> tmp_map;
    channel->declareExchange("exchange1", nzq::ExchangeType::TOPIC,true,false, tmp_map);//  2. 声明一个队列queue1
    channel->declareQueue("queue1",true,false,false, tmp_map);//  3. 声明一个队列queue2
    channel->declareQueue("queue2",true,false,false, tmp_map);//  4. 绑定queue1-exchange1,且binding_key设置为queue1
    channel->queueBind("exchange1","queue1","queue1");//  5. 绑定queue2-exchange1,且binding_key设置为news.music.#
    channel->queueBind("exchange1","queue2","news.music.#");//5. 循环向交换机发布消息for(int i =0; i <10; i++){
        nzq::BasicProperties bp;
        bp.set_id(nzq::UUIDHelper::uuid());
        bp.set_delivery_mode(nzq::DeliveryMode::DURABLE);
        bp.set_routing_key("news.music.pop");
        channel->basicPublish("exchange1",&bp,"Hello World-"+ std::to_string(i));}
    nzq::BasicProperties bp;
    bp.set_id(nzq::UUIDHelper::uuid());
    bp.set_delivery_mode(nzq::DeliveryMode::DURABLE);
    bp.set_routing_key("news.music.sport");
    channel->basicPublish("exchange1",&bp,"Hello linux");

    bp.set_routing_key("news.sport");
    channel->basicPublish("exchange1",&bp,"Hello chileme?");//6. 关闭信道
    conn->closeChannel(channel);return0;return0;}

消费者客户端

同生产者客户端,如何消费需要自己设置,下面只是其中一种

voidcb(nzq::Channel::ptr &channel,const std::string consumer_tag,const nzq::BasicProperties *bp,const std::string &body){
    std::cout << consumer_tag <<"消费了消息:"<< body << std::endl;
    channel->basicAck(bp->id());}intmain(int argc,char*argv[]){if(argc !=2){
        std::cout <<"usage: ./consume_client queue1\n";return-1;}//1. 实例化异步工作线程对象
    nzq::AsyncWorker::ptr awp = std::make_shared<nzq::AsyncWorker>();//2. 实例化连接对象
    nzq::Connection::ptr conn = std::make_shared<nzq::Connection>("127.0.0.1",8085, awp);//3. 通过连接创建信道
    nzq::Channel::ptr channel = conn->openChannel();//4. 通过信道提供的服务完成所需//  1. 声明一个交换机exchange1, 交换机类型为广播模式
    google::protobuf::Map<std::string, std::string> tmp_map;
    channel->declareExchange("exchange1", nzq::ExchangeType::TOPIC,true,false, tmp_map);//  2. 声明一个队列queue1
    channel->declareQueue("queue1",true,false,false, tmp_map);//  3. 声明一个队列queue2
    channel->declareQueue("queue2",true,false,false, tmp_map);//  4. 绑定queue1-exchange1,且binding_key设置为queue1
    channel->queueBind("exchange1","queue1","queue1");//  5. 绑定queue2-exchange1,且binding_key设置为news.music.#
    channel->queueBind("exchange1","queue2","news.music.#");auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
    channel->basicConsume("consumer1", argv[1],false, functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));
    conn->closeChannel(channel);return0;}
标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/weixin_74461263/article/details/142704852
版权归原作者 niu_sama 所有, 如有侵权,请联系我们删除。

“仿RabbitMQ实现消息队列客户端”的评论:

还没有评论