0


RabbitMQ c++

Windows RabbitMQ-c 消息处理流程

RabbitMQ-c Demo

  1. 项目需求: 多个app同时发送消息被多个app同时接收。生产者-消费者模式。
  2. 开发环境: Windows 11 visual studio 2017 camke 3.27.0 Erlang 24.0 rabbitmq-server-3.9.7
  3. Rabbit MQ的安装 RabbitMQ的安装需要依赖Erlang。安装之前,先安装Erlang。Erlang与Rabbit的版本不是所有的都匹配。所有安装的时候注意两者版本要匹配。查看版本匹配。 Erlang下载地址 RabbitMQ下载地址在这里插入图片描述
  4. 配置设置 RabbitMQ安装完成后开启插件节点,运行下图的软件,设置节点。在这里插入图片描述 输入开启节点的指令:rabbitmqctl start_app在这里插入图片描述 开启RabbitMQ管理模块的插件,并配置到RabbitMQ节点上:rabbitmq-plugins enable rabbitmq_management在这里插入图片描述 通过浏览器访问RabbitMQ web管理软件:localhost:15672 用户名:guest,密码:guest在这里插入图片描述在这里插入图片描述
  5. RbbitMQ-c 库的编译 从github下载 源码,用cmake生成.sln,再编译lib。在这里插入图片描述在这里插入图片描述在这里插入图片描述 备注:将生成的库引入demo
  6. 编写Client demo 分为publish demo 和 consumer demo ,封装共同访问类CRabbitClient。
#pragmaonce#include<string>#include<vector>#include<amqp.h>#include<amqp_tcp_socket.h>#include<thread>using  RecvCallBack = std::function<void(const std::string&)>;classCRabbitmqClient{public:CRabbitmqClient();~CRabbitmqClient();/*
    * 连接RabbitMQ Server
    * @param [in] strExchange:交换器名称
    * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅)
    * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
    * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
    * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
    * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
    * @returns 设置结果

   */intConnect(const std::string &strHostname,int iPort,const std::string &strUser,const std::string &strPasswd);intDisconnect();/*
     * 声明交换器
     * @param [in] strExchange:交换器名称
     * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅)
     * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
     * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
     * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
     * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
     * @returns 声明结果

    */intExchangeDeclare(const std::string &strExchange,const std::string& strType,bool isPassive =false,bool isDurable =false,bool isAutoDelete =false,int internal =0);/*
    * 声明队列
    * @param [in] strQueueName:队列名称
    * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
    * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
    * @param [in] isExclusive:只有自己的用户对该队列可见
    * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
    * @returns 声明结果
   */intQueueDelare(const std::string& strQueueName,bool isPassive =false,bool isDurable =false,bool isExclusive =false,bool isAutoDelete =false);/*
    * 将队列绑定到交换器
    * @param [in] strQueueName:队列名称
    * @param [in] strExchange:交换器名称
    * @param [in] strBindKey:路由键
    * @returns 绑定结果
    */intQueueBind(const std::string &strQueueName,const std::string &strExchange,const std::string &strBindKey);intQueueUnbind(const std::string &strQueueName,const std::string &strExchange,const std::string &strBindKey);intQueueDelete(const std::string &strQueueName,int iIfUnused);/*
      * 发布消息
      * @param [in] strMessage:需要发送的消息
      * @param [in] strExchange:交换器名称
      * @param [in] strBindKey:路由键
      * @returns 发布结果
      */intPublish(const std::string &strMessage,const std::string &strExchange,const std::string &strRoutekey);intConsumer(const std::string &strQueueName,  RecvCallBack func);private:intErrorMsg(amqp_rpc_reply_t x,charconst*context);voidStartRecvThread();voidrun();private:
    std::string                 m_strHostname{"127.0.0.1"};// amqp主机int                         m_iPort{5672};// amqp端口
    std::string                    m_strUser{"guest"};
    std::string                    m_strPasswd{"guest"};int                         m_iChannel=2;

    amqp_socket_t               *m_pSock{nullptr};
    amqp_connection_state_t     m_pConn{nullptr};

    std::unique_ptr<std::thread> m_pRecvThread{nullptr};bool m_isRun{false};

    RecvCallBack m_recvFunc{nullptr};};
#include"CRabbitmqClient.h"CRabbitmqClient::CRabbitmqClient(){}CRabbitmqClient::~CRabbitmqClient(){}intCRabbitmqClient::Connect(const std::string &strHostname,int iPort,const std::string &strUser,const std::string &strPasswd){
    m_strHostname = strHostname;
    m_iPort = iPort;
    m_strUser = strUser;
    m_strPasswd = strPasswd;

    m_pConn =amqp_new_connection();if(m_pConn ==nullptr){return-1;}
    m_pSock =amqp_tcp_socket_new(m_pConn);if(m_pSock ==nullptr){return-2;}int status =amqp_socket_open(m_pSock, m_strHostname.c_str(), m_iPort);if(status<0){return-3;}if(0!=ErrorMsg(amqp_login(m_pConn,"guest",0,131072,0, AMQP_SASL_METHOD_PLAIN, m_strUser.c_str(), m_strPasswd.c_str()),"Logging in")){return-4;}return0;}intCRabbitmqClient::Disconnect(){if(m_pConn !=nullptr){if(0!=ErrorMsg(amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS),"Closeing connectiong")){return-1;}if(amqp_destroy_connection(m_pConn)<0){return-2;}
        m_pConn =nullptr;}return0;}intCRabbitmqClient::ExchangeDeclare(const std::string &strExchange,const std::string& strType,bool isPassive,bool isDurable ,bool isAutoDelete,int internal ){amqp_channel_open(m_pConn, m_iChannel);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"open channel")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-1;}
    amqp_bytes_t exchange =amqp_cstring_bytes(strExchange.c_str());
    amqp_bytes_t type =amqp_cstring_bytes(strType.c_str());amqp_exchange_declare(m_pConn,m_iChannel,exchange,type, isPassive, isDurable, isAutoDelete, internal,amqp_empty_table);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"exchange_declare")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-2;}amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return0;}intCRabbitmqClient::QueueDelare(const std::string& strQueueName,bool isPassive ,bool isDurable ,bool isExclusive,bool isAutoDelete ){if(m_pConn ==nullptr){return-1;}amqp_channel_open(m_pConn, m_iChannel);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"open channel")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-2;}
    amqp_bytes_t queue =amqp_cstring_bytes(strQueueName.c_str());amqp_queue_declare(m_pConn, m_iChannel,queue, isPassive, isDurable, isExclusive, isAutoDelete, amqp_empty_table);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"queue_declare")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-3;}amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return0;}intCRabbitmqClient::QueueBind(const std::string &strQueueName,const std::string &strExchange,const std::string &strBindKey){if(m_pConn ==nullptr){return-1;}amqp_channel_open(m_pConn, m_iChannel);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"open channel")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-2;}
    amqp_bytes_t queue =amqp_cstring_bytes(strQueueName.c_str());
    amqp_bytes_t exchange  =amqp_cstring_bytes(strExchange.c_str());
    amqp_bytes_t routkey =amqp_cstring_bytes(strBindKey.c_str());amqp_queue_bind(m_pConn, m_iChannel, queue, exchange, routkey, amqp_empty_table);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"queue_bind")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-3;}amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return0;}intCRabbitmqClient::QueueUnbind(const std::string &strQueueName,const std::string &strExchange,const std::string &strBindKey){if(m_pConn ==nullptr){return-1;}amqp_channel_open(m_pConn, m_iChannel);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"open channel")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-2;}
    amqp_bytes_t queue =amqp_cstring_bytes(strQueueName.c_str());
    amqp_bytes_t exchange =amqp_cstring_bytes(strExchange.c_str());
    amqp_bytes_t routkey =amqp_cstring_bytes(strBindKey.c_str());amqp_queue_unbind(m_pConn, m_iChannel, queue, exchange, routkey, amqp_empty_table);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"queue_unbind")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-3;}amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return0;}intCRabbitmqClient::QueueDelete(const std::string &strQueueName,int iIfUnused){if(m_pConn ==nullptr){return-1;}amqp_channel_open(m_pConn, m_iChannel);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"open channel")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-2;}if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"delete queue")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-3;}amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return0;}intCRabbitmqClient::Publish(const std::string &strMessage,const std::string &strExchange,const std::string &strRoutekey){if(m_pConn ==nullptr){return-1;}amqp_channel_open(m_pConn, m_iChannel);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"open channel")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-2;}

    amqp_bytes_t message_bytes;
    message_bytes.len = strMessage.length();
    message_bytes.bytes =(void*)(strMessage.c_str());

    amqp_bytes_t exchange =amqp_cstring_bytes(strExchange.c_str());
    amqp_bytes_t routekey =amqp_cstring_bytes(strRoutekey.c_str());//if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, &props, message_bytes)) {if(0!=amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey,0,0,NULL, message_bytes)){fprintf(stderr,"publish amqp_basic_publish failed\n");if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"amqp_basic_publish")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-3;}}amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);}intCRabbitmqClient::Consumer(const std::string &strQueueName, RecvCallBack func){if(m_pConn ==nullptr){return-1;}if(m_recvFunc ==nullptr){
        m_recvFunc = func;}amqp_channel_open(m_pConn, m_iChannel);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"open channel")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-2;}amqp_basic_qos(m_pConn, m_iChannel,0,1,0);int ack =1;// no_ack    是否需要确认消息后再从队列中删除消息
    amqp_bytes_t queuename =amqp_cstring_bytes(strQueueName.c_str());amqp_basic_consume(m_pConn, m_iChannel, queuename, amqp_empty_bytes,0, ack,0, amqp_empty_table);if(0!=ErrorMsg(amqp_get_rpc_reply(m_pConn),"Consuming")){amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);return-3;}StartRecvThread();return0;}intCRabbitmqClient::ErrorMsg(amqp_rpc_reply_t x,charconst*context){switch(x.reply_type){case AMQP_RESPONSE_NORMAL:return0;case AMQP_RESPONSE_NONE:fprintf(stderr,"%s: missing RPC reply type!\n", context);break;case AMQP_RESPONSE_LIBRARY_EXCEPTION:fprintf(stderr,"%s: %s\n", context,amqp_error_string2(x.library_error));break;case AMQP_RESPONSE_SERVER_EXCEPTION:switch(x.reply.id){case AMQP_CONNECTION_CLOSE_METHOD:{
            amqp_connection_close_t *m =(amqp_connection_close_t *)x.reply.decoded;fprintf(stderr,"%s: server connection error %uh, message: %.*s\n",
                context, m->reply_code,(int)m->reply_text.len,(char*)m->reply_text.bytes);break;}case AMQP_CHANNEL_CLOSE_METHOD:{
            amqp_channel_close_t *m =(amqp_channel_close_t *)x.reply.decoded;fprintf(stderr,"%s: server channel error %uh, message: %.*s\n",
                context, m->reply_code,(int)m->reply_text.len,(char*)m->reply_text.bytes);break;}default:fprintf(stderr,"%s: unknown server error, method id 0x%08X\n",
                context, x.reply.id);break;}break;}return-1;}voidCRabbitmqClient::StartRecvThread(){if(m_pRecvThread ==nullptr){
        m_pRecvThread = std::make_unique<std::thread>(&CRabbitmqClient::run,this);
        m_isRun =true;}}voidCRabbitmqClient::run(){
    std::chrono::milliseconds interval{1000};
    amqp_rpc_reply_t res;
    amqp_envelope_t envelope;while(true){amqp_maybe_release_buffers(m_pConn);
        res =amqp_consume_message(m_pConn,&envelope,nullptr,0);if(AMQP_RESPONSE_NORMAL != res.reply_type){fprintf(stderr,"Consumer amqp_channel_close failed\n");amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);break;}

        std::string str((char*)envelope.message.body.bytes,(char*)envelope.message.body.bytes + envelope.message.body.len);if(m_recvFunc){m_recvFunc(str);}amqp_destroy_envelope(&envelope);

        std::this_thread::sleep_for(interval);}amqp_channel_close(m_pConn,1, AMQP_REPLY_SUCCESS);amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS);amqp_destroy_connection(m_pConn);}

publish

// RabbitMQPublish.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。//#include<iostream>#include"CRabbitmqClient.h"intmain(){
    CRabbitmqClient objRabbitmq;

    std::string strIP ="127.0.0.1";int iPort =5672;
    std::string strUser ="guest";
    std::string strPasswd ="guest";int iRet = objRabbitmq.Connect(strIP, iPort, strUser, strPasswd);printf("Rabbitmq Connect Ret: %d\n", iRet);

    std::string strExchange ="ExchangeTest1";
    std::string strRoutekey ="routekeyTest1";
    std::string strQueuename1 ="queueTest1";
    std::string strQueuename2 ="queueTest2";

    iRet = objRabbitmq.ExchangeDeclare(strExchange,"fanout");printf("Rabbitmq ExchangeDeclare Ret: %d\n", iRet);

    iRet = objRabbitmq.QueueDelare(strQueuename1);printf("Rabbitmq QueueDeclare1 Ret: %d\n", iRet);

    iRet = objRabbitmq.QueueDelare(strQueuename2);printf("Rabbitmq QueueDeclare2 Ret: %d\n", iRet);

    iRet = objRabbitmq.QueueBind(strQueuename1, strExchange, strRoutekey);printf("Rabbitmq QueueBind1 Ret: %d\n", iRet);

    iRet = objRabbitmq.QueueBind(strQueuename2, strExchange, strRoutekey);printf("Rabbitmq QueueBind2 Ret: %d\n", iRet);// Send Msgwhile(true){
        std::string temp;
        std::cin >> temp;
        std::string strSendMsg = temp;
        iRet = objRabbitmq.Publish(strSendMsg, strExchange, strRoutekey);printf("Rabbitmq Publish 1 Ret: %d\n", iRet);}
    objRabbitmq.Disconnect();return0;}

consumer

// RabbitMQConsumer.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。//#include<iostream>#include"CRabbitmqClient.h"voidRecvFunc(const std::string& message){
    std::cout << message<<std::endl;}intmain(){
    CRabbitmqClient objRabbitmq;

    std::string strIP ="127.0.0.1";int iPort =5672;
    std::string strUser ="guest";
    std::string strPasswd ="guest";int iRet = objRabbitmq.Connect(strIP, iPort, strUser, strPasswd);printf("Rabbitmq Connect Ret: %d\n", iRet);

    std::string strExchange ="ExchangeTest1";
    std::string strRoutekey ="routekeyTest1";
    std::string strQueuename ="queueTest1";

    iRet = objRabbitmq.Consumer(strQueuename, RecvFunc);printf("Rabbitmq Consumer Ret: %d\n", iRet);
    std::chrono::milliseconds interval{1000};while(true){
        std::this_thread::sleep_for(interval);}return0;}
标签: c++ visualstudio

本文转载自: https://blog.csdn.net/u010549750/article/details/132105475
版权归原作者 snow190639442 所有, 如有侵权,请联系我们删除。

“RabbitMQ c++”的评论:

还没有评论