0


[C++][第三方库][RabbitMq]详细讲解

目录


1.介绍

  • RabbitMQ消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)
  • 核心概念:交换机、队列、绑定、消息
  • 交换机类型: - 广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中- 直接交换:根据消息中的bkey与绑定的rkey对比,一致则放入队列- 主题交换:使用bkey与绑定的rkey进行规则匹配,成功则放入队列

2.安装

1.RabbitMq

  • 安装sudo apt install rabbitmq-server
  • 简单使用# 安装完成的时候默认有个用户guest,但是权限不够,要创建一个administrator用户,才可以做为远程登录和发表订阅消息#添加用户 sudo rabbitmqctl add_user root <PASSWORD>#设置用户tag sudo rabbitmqctl set_user_tags root administrator #设置用户权限 sudo rabbitmqctl set_permissions -p / root ".""."".*"# RabbitMQ自带了web管理界面, 执行下面命令开启, 默认端口15672sudo rabbitmq-plugins enable rabbitmq_management

2.客户端库

  • C语言库
  • C++库sudoaptinstall libev-dev #libev 网络库组件git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.gitcd AMQP-CPP/makemakeinstall
  • 如果安装时出现以下报错,则表示ssl版本出现问题/usr/include/openssl/macros.h:147:4: error: #error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" 147 | # error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" | ^~~~~ In file included from /usr/include/openssl/ssl.h:18, from linux_tcp/openssl.h:20, from linux_tcp/openssl.cpp:12: /usr/include/openssl/bio.h:687:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’ 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
  • 解决方案:卸载当前的ssl库,重新进行修复安装dpkg -l |grep sslsudo dpkg -P --force-all libevent-openssl-2.1-7sudo dpkg -P --force-all opensslsudo dpkg -P --force-all libssl-devsudoapt --fix-broken install

3.AMQP-CPP 简单使用

1.介绍

  • AMQP-CPP是用于与RabbitMq消息中间件通信的C++库 - 它能解析从RabbitMq服务发送来的数据,也可以生成发向RabbitMq的数据包- AMQP-CPP库不会向RabbitMq建立网络连接,所有的网络IO由用户完成
  • AMQP-CPP提供了可选的网络层接口,它预定义了TCP模块,用户就不用自己实现网络IO, - 也可以选择libevent、libev、libuv、asio等异步通信组件, 需要手动安装对应的组件
  • AMQP-CPP完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中
  • 注意:它需要C++17的支持

2.使用

  • AMQP-CPP的使用有两种模式: - 使用默认的TCP模块进行网络通信- 使用扩展的libevent、libev、libuv、asio异步通信组件进行通信
  • 此处以libev为例,不需要自己实现monitor函数,可以直接使用AMQP::LibEvHandler

4.类与接口

1.Channel

  • channel是一个虚拟连接,一个连接上可以建立多个通道 - 并且所有的RabbitMq指令都是通过channel传输 - 所以连接建立后的第一步,就是建立channel- 因为所有操作是异步的,所以在channel上执行指令的返回值并不能作为操作执行结果 - 实际上它返回的是Deferred类,可以使用它安装处理函数
namespace AMQP 
{/** 
     *  Generic callbacks that are used by many deferred objects 
     */using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(constchar*message)>;using FinalizeCallback = std::function<void()>;/** 
     *  Declaring and deleting a queue 
     */using QueueCallback = std::function<void(const std::string &name,uint32_t messagecount,uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message,uint64_t deliveryTag,bool redelivered)>;// 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback using AckCallback = std::function<void(uint64_t deliveryTag,bool multiple)>;// 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调 using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;// 信道类classChannel{Channel(Connection *connection);boolconnected();/** 
            *声明交换机 
            *如果提供了一个空名称,则服务器将分配一个名称。 
            *以下flags可用于交换机: 
            * 
            *-durable     持久化,重启后交换机依然有效 
            *-autodelete  删除所有连接的队列后,自动删除交换 
            *-passive     仅被动检查交换机是否存在 
            *-internal    创建内部交换 
            * 
            *@param name    交换机的名称 
            *@param-type    交换类型 
                enum ExchangeType 
                { 
                    fanout,  广播交换,绑定的队列都能拿到消息 
                    direct,  直接交换,只将消息交给routingkey一致的队列 
                    topic,   主题交换,将消息交给符合bindingkey规则的队列 
                    headers, 
                    consistent_hash, 
                    message_deduplication 
                }; 
            *@param flags    交换机标志 
            *@param arguments其他参数 
            * 
            *此函数返回一个延迟处理程序。可以安装回调 
            using onSuccess(), onError() and onFinalize() methods. 
        */ 
        Deferred &declareExchange(const std::string_view &name,
                                  ExchangeType type,int flags,const Table &arguments);/** 
            *声明队列 
            *如果不提供名称,服务器将分配一个名称。 
            *flags可以是以下值的组合: 
            * 
            *-durable 持久队列在代理重新启动后仍然有效 
            *-autodelete 当所有连接的使用者都离开时,自动删除队列 
            *-passive 仅被动检查队列是否存在 
            *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除 
            * 
            *@param name        队列的名称 
            *@param flags       标志组合 
            *@param arguments  可选参数 
            * 
            *此函数返回一个延迟处理程序。可以安装回调 
            *使用onSuccess()、onError()和onFinalize()方法。 
            * 
            Deferred &onError(const char *message) 
            * 
            *可以安装的onSuccess()回调应该具有以下签名: 
            void myCallback(const std::string &name,  
                uint32_t messageCount,  
                uint32_t consumerCount); 
            例如: 
            channel.declareQueue("myqueue").onSuccess( 
                [](const std::string &name,  
                    uint32_t messageCount, 
                    uint32_t consumerCount) { 
                       std::cout << "Queue '" << name << "' "; 
                       std::cout << "has been declared with "; 
                       std::cout << messageCount; 
                       std::cout << " messages and "; 
                       std::cout << consumerCount; 
                       std::cout << " consumers" << std::endl; 
         *  }); 
        */ 
        DeferredQueue &declareQueue(const std::string_view &name,int flags,const Table &arguments);/** 
            *将队列绑定到交换机 
            * 
            *@param exchange     源交换机 
            *@param queue        目标队列 
            *@param routingkey   路由密钥 
            *@param arguments    其他绑定参数 
            * 
            *此函数返回一个延迟处理程序。可以安装回调 
            *使用onSuccess()、onError()和onFinalize()方法。 
        */ 
        Deferred &bindQueue(const std::string_view &exchange,const std::string_view &queue,const std::string_view &routingkey,const Table &arguments);/** 
            *将消息发布到exchange
            *您必须提供交换机的名称和路由密钥。 
            然后,RabbitMQ将尝试将消息发送到一个或多个队列。 
            使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。
            默认情况下,不可更改的消息将被静默地丢弃。 
            *  
            *如果设置了'mandatory'或'immediate'标志, 
            则无法处理的消息将返回到应用程序。 
            在开始发布之前,请确保您已经调用了recall()-方法, 
            并设置了所有适当的处理程序来处理这些返回的消息。 
            *  
            *可以提供以下flags: 
            *  
            *-mandatory 如果设置,服务器将返回未发送到队列的消息 
            *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。 

            *@param exchange要发布到的交易所 
            *@param routingkey路由密钥 
            *@param envelope要发送的完整信封 
            *@param message要发送的消息 
            *@param size消息的大小 
            *@param flags可选标志 
        */boolpublish(const std::string_view &exchange,const std::string_view &routingKey,const std::string &message,int flags =0);/** 
            *告诉RabbitMQ服务器已准备好使用消息-也就是 订阅队列消息 
            * 
            *调用此方法后,RabbitMQ开始向客户端应用程序传递消息。 
            consumer tag是一个字符串标识符, 
            如果您以后想通过channel::cancel()调用停止它, 
            可以使用它来标识使用者。 
            *如果您没有指定使用者tag,服务器将为您分配一个。 
            * 
            *支持以下flags: 
            * 
            *-nolocal    如果设置了,则不会同时消耗在此通道上发布的消息 
            *-noack      如果设置了,则不必对已消费的消息进行确认 
            *-exclusive  请求独占访问,只有此使用者可以访问队列 
            * 
            *@param queue    您要使用的队列 
            *@param tag      将与此消费操作关联的消费者标记 
            *@param flags    其他标记 
            *@param arguments其他参数 
            * 
            *此函数返回一个延迟处理程序。 
            可以使用onSuccess()、onError()和onFinalize()方法安装回调

            可以安装的onSuccess()回调应该具有以下格式: 
                void myCallback(const std::string_view&tag); 
            样例: 
            channel.consume("myqueue").onSuccess( 
                [](const std::string_view& tag) { 
                    std::cout << "Started consuming under tag "; 
                    std::cout << tag << std::endl; 
            }); 
        */ 
        DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag,int flags,const Table &arguments);/** 
            *确认接收到的消息 
            *
            *消费者客户端对收到的消息进行确认应答
            *
            *当在DeferredConsumer::onReceived()方法中接收到消息时, 
            必须确认该消息, 
            以便RabbitMQ将其从队列中删除(除非使用noack选项消费)
            * 
            *支持以下标志: 
            * 
            *-多条确认多条消息:之前传递的所有未确认消息也会得到确认 
            * 
            *@param deliveryTag    消息的唯一delivery标签 
            *@param flags          可选标志 
            *@return bool 
        */boolack(uint64_t deliveryTag,int flags=0);};classDeferredConsumer{/* 
            注册一个回调函数,该函数在消费者启动时被调用
            void onSuccess(const std::string &consumertag) 
        */ 
        DeferredConsumer &onSuccess(const ConsumeCallback& callback);/* 
            注册回调函数,用于接收到一个完整消息的时候被调用 
            void MessageCallback(const AMQP::Message &message,  
                uint64_t deliveryTag, bool redelivered) 
        */ 
        DeferredConsumer &onReceived(const MessageCallback& callback);/* Alias for onReceived() */ 
        DeferredConsumer &onMessage(const MessageCallback& callback);/* 
            注册要在服务器取消消费者时调用的函数 
            void CancelCallback(const std::string &tag) 
        */ 
        DeferredConsumer &onCancelled(const CancelCallback& callback);};classMessage:publicEnvelope{const std::string &exchange();const std::string &routingkey();};classEnvelope:publicMetaData{constchar*body();// 获取消息正文uint64_tbodySize();// 获取消息正文大小};}

2.ev

typedefstructev_async{EV_WATCHER(ev_async);
    EV_ATOMIC_T sent;/* private */}ev_async;//break type enum{ 
    EVBREAK_CANCEL =0,/* undo unloop */ 
    EVBREAK_ONE    =1,/* unloop once */ 
    EVBREAK_ALL    =2/* unloop all loops */};// 实例化并获取IO事件监控接口句柄structev_loop*ev_default_loop(unsignedint flags EV_CPP(=0));#defineEV_DEFAULTev_default_loop(0)// 开始运行IO事件监控, 这是一个阻塞接口intev_run(structev_loop*loop);/* break out of the loop */// 结束IO监控// 如果在主线程进行ev_run(), 则可以直接调用,// 如果在其他线程中进行ev_run(), 需要通过异步通知进行voidev_break(structev_loop*loop,int32_t break_type);void(*callback)(structev_loop*loop, ev_async *watcher,int32_t revents);// 初始化异步事件结构, 并设置回调函数voidev_async_init(ev_async *w, callback cb);// 启动事件监控循环中的异步任务处理voidev_async_start(structev_loop*loop, ev_async *w);// 发送当前异步事件到异步线程中执行voidev_async_send(structev_loop*loop, ev_async *w);

5.使用

1.publish.cc

#include<ev.h>#include<amqpcpp.h>#include<amqpcpp/libev.h>#include<openssl/ssl.h>#include<openssl/opensslv.h>intmain(){// 1.实例化底层网络通信框架的IO事件监控句柄auto*loop = EV_DEFAULT;// 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来
    AMQP::LibEvHandler handler(loop);// 3.实例化连接对象
    AMQP::Address address("amqp://root:[email protected]:5672/");
    AMQP::TcpConnection connection(&handler, address);// 4.实例化信道对象
    AMQP::TcpChannel channel(&connection);// 5.声明交换机
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](constchar*message){ std::cout <<"声明交换机失败: "<< message << std::endl;}).onSuccess([](){ std::cout <<"test-exchange 交换机创建成功"<< std::endl;});// 6.声明队列
    channel.declareQueue("test-queue").onError([](constchar*message){ std::cout <<"声明队列失败: "<< message << std::endl;}).onSuccess([](){ std::cout <<"test-queue 队列创建成功"<< std::endl;});// 7.针对交换机和队列进行绑定
    channel.bindQueue("test-exchange","test-queue","test-queue-key").onError([](constchar*message){ std::cout <<"test-exchange - test-queue 绑定失败: " \
                 << message << std::endl;}).onSuccess([](){ std::cout <<"test-exchange - test-queue 绑定成功"<< std::endl;});// 8.向交换机发布消息for(int i =0; i <5;++i){
        std::string msg ="Hello SnowK-"+ std::to_string(i);if(channel.publish("test-exchange","test-queue-key", msg)==false){
            std::cout <<"publish 失败"<< std::endl;}}// 9.启动底层网络通信框架 -> 开启IOev_run(loop,0);return0;}

2.consume.cc

#include<ev.h>#include<amqpcpp.h>#include<amqpcpp/libev.h>#include<openssl/ssl.h>#include<openssl/opensslv.h>voidMessageCB(AMQP::TcpChannel* channel,const AMQP::Message& message,uint64_t deliveryTag,bool redelivered){
    std::string msg;
    msg.assign(message.body(), message.bodySize());// 不能这样使用, AMQP::Message后面没有存'\0'// std::cout << message << std::endl 
    
    std::cout << msg << std::endl;
    channel->ack(deliveryTag);}intmain(){// 1.实例化底层网络通信框架的IO事件监控句柄auto*loop = EV_DEFAULT;// 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来
    AMQP::LibEvHandler handler(loop);// 3.实例化连接对象
    AMQP::Address address("amqp://root:[email protected]:5672/");
    AMQP::TcpConnection connection(&handler, address);// 4.实例化信道对象
    AMQP::TcpChannel channel(&connection);// 5.声明交换机
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](constchar*message){ std::cout <<"声明交换机失败: "<< message << std::endl;}).onSuccess([](){ std::cout <<"test-exchange 交换机创建成功"<< std::endl;});// 6.声明队列
    channel.declareQueue("test-queue").onError([](constchar*message){ std::cout <<"声明队列失败: "<< message << std::endl;}).onSuccess([](){ std::cout <<"test-queue 队列创建成功"<< std::endl;});// 7.针对交换机和队列进行绑定
    channel.bindQueue("test-exchange","test-queue","test-queue-key").onError([](constchar*message){ std::cout <<"test-exchange - test-queue 绑定失败: " \
                 << message << std::endl;}).onSuccess([](){ std::cout <<"test-exchange - test-queue 绑定成功";});// 8.订阅消息对垒 -> 设置消息处理回调函数auto callback = std::bind(MessageCB,&channel, std::placeholders::_1, 
                              std::placeholders::_2, std::placeholders::_3);
    channel.consume("test-queue","consume-tag").onReceived(callback).onError([](constchar*message){ 
            std::cout <<"订阅 test-queue 队列消息失败: "<< message << std::endl;exit(0);});// 9.启动底层网络通信框架 -> 开启IOev_run(loop,0);return0;}

3.makefile

all: publish consume
publish: publish.cc
    g++ -o $@ $^ -lamqpcpp -lev -std=c++17
consume: consume.cc
    g++ -o $@ $^ -lamqpcpp -lev -std=c++17

.PHONY:clean
clean:
    rm publish consume


本文转载自: https://blog.csdn.net/qq_37281656/article/details/142712555
版权归原作者 DieSnowK 所有, 如有侵权,请联系我们删除。

“[C++][第三方库][RabbitMq]详细讲解”的评论:

还没有评论