0


RabbitMQ介绍+使用手册

文章目录

一、rabbitmq介绍

​ RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

RabbitMQ优势

  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

二、rabbitmq服务器安装及环境配置

1.下载并安装rabbitmq服务器

​ 首先进入rabbitmq官网(rabbitmq.com)

在这里插入图片描述

​ 向下滑动滚轮找到Download+Installation,点击进入

在这里插入图片描述

​ 点击Install Windows安装windows版本

在这里插入图片描述

​ 向下滑动滚轮找到下图框住的两个下载链接

在这里插入图片描述

​ 下载的软件位置,先安装otp.exe,鼠标右键以管理员方式运行。接着选取要安装的路径,然后一路傻瓜式安装 next 下一步,安装即可。不要安装在中文或带空格的文件路径下。

​ 配置系统环境变量:右键此电脑 - 属性 - 高级系统设置 - 环境变量。

在这里插入图片描述

​ 接着打开 - 此电脑(文件资源管理器) 找到刚刚我们安装的文件 bin 目录下,复制路径 ctrl+c 切换窗口到环境变量,找到系统变量 path - 编辑

在这里插入图片描述

​ 新建 - ctrl + v 粘贴刚才我们复制的路径,然后三次确定,关闭环境变量窗口

在这里插入图片描述

​ 安装 RabbitMQ:右键管理员运行,然后选择安装路径,接着一路 next 下一步,遇到弹窗点允许,没有弹窗则无视(不要安装在中文或带空格的文件路径下)。安装完成后也要配置系统环境变量:配置的方法和上面一致,路径是安装的文件下的sbin下。

在这里插入图片描述

安装完成后找到安装文件路径,找到 sbin 目录下,全选路径 输入 cmd,打开cmd命令窗口,运行下面命令(这个指令是为了启动web界面)。

rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

​ 然后再通过管理员权限打开一个cmd,输入rabbitmq-service stop,再输入rabbitmq-service start
在这里插入图片描述

(如果这里运行这两个命令不成功可能是rabbitmq服务器安装未成功,需要在管理员权限下的cmd执行rabbitmq-service install)

​ 打开浏览器,访问 127.0.0.1:15672,出现管理页面就安装成功了。账户:guest,密码:guest。

在这里插入图片描述

2.下载并编译rabbitmq-c静态库

第一步:在https://github.com/alanxz链接下载rabbitmq-c静态库。

第二步:使用cmake编译生成适合自己编译环境的工程

​ (1)填写源代码路径

​ (2)建立后的路径,build的文件夹一般建立在源代码路径里,也可以放到其他位置

​ (3)点击Configure按钮

​ (4)点击Generate按钮

在这里插入图片描述

​ (如出现如下报错,可以去掉ENABLE_SSL_SUPPORT括号里的勾。)

在这里插入图片描述

​ 然后再将工程放在vs中编译生成即可在工程的build/librabbitmq/Debug文件夹下找到生成的rabbitmq.4.lib和rabbimq.4.dll了,将dll和lib放在自己所需的工程下即可用AMQP库的接口函数了。

三、rabbitmq使用

1.rabbitmq架构及各组件功能

在这里插入图片描述

​ virtual Host:虚拟主机。标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

​ channel:通道,amqp支持一个tcp连接上启用多个mq通信通道,每个通道都可以被作为通信流。

​ publisher:生产者,是消息产生的源头。

​ exchange:交换机,可以理解为具有路由表的路由规则。

​ queues:队列,装载消息的缓存容器。

​ consumer:消费者,连接到队列并取走消息的客户端。

核心思想:在RabbitMQ中,生产者从不直接将消息发送给队列。

​ 事实上,有些生产者甚至不知道消息是否被送到某个队列中去了。生产者只负责将消息送给交换机,而交换机确切地知道什么消息应该送到哪。

​ bind:绑定,实际上可以理解为交换机的路由规则。每个消息都有一个称为路由键的属性(routing key),就是一个简单的字符串。一个绑定将【交换机,路由键,消息送达队列】三者绑定在一起,形成一条路由规则。

​ exchange type:交换机类型:

​ fanout:不处理路由键,转发到所有绑定的队列上

​ direct:处理路由键,必须完全匹配,即路由键字符串相同才会转发

​ topic:路由键模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”只能匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”

注意:这里用户的创建是在服务器的Web界面上创建的!并且用户所拥有的权限还决定能接收的信息。

​ 创建用户方法:

​ 第一步:打开浏览器,访问 127.0.0.1:15672,输入账户:guest,密码:guest。
​ 第二步:点击导航栏里的Admin

在这里插入图片描述

​ 第三步:点击Add a user左边的小三角,在username后面输入创建用户的名字,在Password里输出密码,下一行的(confirm)是确认密码,然后在最后一个输入的右侧选择你要创建用户的角色,这个决定用户拥有的权限,在目前我的探究里,如果发送方通信的方式选的是topic且接收方权限是Admin,那么Routingkey的设置将毫无意义,不管设置什么接收方都会接收到这个交换机里的所有信息,设置其他的权限则可以通过Routingkey来选择想要接收到的消息。

在这里插入图片描述

​ 第四步:点击Add user,即可成功创建一个用户。

在这里插入图片描述

​ 第五步:点击你创建的用户

在这里插入图片描述

​ 第六步:设置用户的虚拟主机权限以及TOPIC权限,直接用默认的就行,点击上下两个Set permission就ok了

在这里插入图片描述

在这里插入图片描述

2.rabbitmq通信方式
2.1扇出模式(fanout)

在Fanout模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费在这里插入图片描述
2.2直连模式(direct)

在Direct模型下:

​ 队列与交换机的绑定,不是任意绑定,而是要指定一个RoutingKey(路由key)。消息的发送方在向 Exchange发送消息时,也必须指定消息的 RoutingKey。Exchange会判断每一个绑定的队列,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
在这里插入图片描述

2.3主题模式(topic)

在Topic模式下:

​ Topic模式的Exchange与Direct相比,可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

在这里插入图片描述

3.接口函数介绍
3.1通用函数
  //创建一个新的AMQP连接,他将被用于RabbitMQ服务器进行通信
  amqp_new_connection();
   //这行代码创建了一个新的TCP套接字,并将其与之前的AMQP关联起来
  amqp_tcp_socket_new(conn);
  /**
  *        @brief  amqp_socket_open()        用来打开一个与AMQP服务器的网络套接字连接。
  *        @param  socket                    用于存储套接字连接信息的指针。
  *        @param  host                    AMQP服务器的主机名或IP地址
  *        @param  port                    AMQP服务器主机的端口号,默认端口号是5672
  */
  amqp_socket_open(socket, "127.0.0.1", 5672);
  /**
  *        @brief  amqp_login()            用来来进行与AMQP服务器的身份验证和登录操作。
  *        @param  conn                    与AMQP服务器建立的连接。
  *        @param  vhost                    虚拟主机的名称,用于指定要连接的AMQP虚拟主机,"/"表示连接到默认的虚拟主机。
  *        @param  channel_max                通道的最大数量。在这里,设置为0表示使用服务器的默认设置。
  *        @param  frame_max                每个AMQP帧的最大字节大小。AMQP_DEFAULT_FRAME_SIZE表示使用默认的帧大小。
  *        @param  heartbeat                心跳超时时间,用于检测与服务器之间的连接状态。在这里,设置为0表示禁用心跳检测。
  *        @param  sasl_method                身份验证机制。AMQP_SASL_METHOD_PLAIN表示使用PLAIN身份验证机制。
  *        @param    username                登录用户名。使用"guest"表示使用默认的用户名。
  *        @param  password                登录密码。使用"guest"表示使用默认的密码。
  */
  amqp_login(conn, "/", 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
  /**
  *        amqp_bytes_t是AMQP库中的一种数据类型,用于表示字节数据。
  *        具体而言,amqp_bytes_t是一个结构体类型,包含两个字段:
  *        size_t len:表示字节数据的长度(大小)。
  *        void* bytes:指向字节数据的指针。
  *        通过使用amqp_bytes_t类型,可以方便地处理和传递字节数据,如消息内容、队列名称等。在你的代码片段中,amqp_bytes_t用于作为参数传递给amqp_queue_declare()函数的queue参数。通过使用amqp_bytes_t类型的对象,可以指定要声明的队列名称。在你的例子中,使用了amqp_empty_bytes,表示不指定特定的队列名称,而是让服务器自动生成一个唯一的队列名称。总结起来,amqp_bytes_t是AMQP库中用于表示字节数据的数据类型。它具有表示字节数据长度和指向字节数据的指针的能力,方便在处理和传递字节数据时使用。
  */
  amqp_bytes_t t;
  //打开一个通道(channel),编号为1,所有的消息都是通过通道传输的。
  const amqp_channel_t KChannel = 1;
  amqp_channel_open(conn, KChannel);
  //首先关闭通道,然后关闭连接,最后销毁连接。
  amqp_channel_close(conn, KChannel, AMQP_REPLY_SUCCESS);
  amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
  amqp_destroy_connection(conn);
3.2接收端函数
  /**
  *        @brief  amqp_queue_declare() 向AMQP服务器发送一个队列声明请求,以创建一个新的消息队列或获取现有队列的信息。
  *        @param  conn                 与AMQP服务器建立的连接。
  *        @param  channel              通道编号,用于指定操作的特定通道。
  *        @param  queue                 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  *        @param  passive                 表示是否以被动模式进行声明。如果设置为非零值(例如1),表示以被动模式进行声明,仅返回现有队列的信息而不创建新队列。如果设置为零值,则会创建新队列或返回现有队列的信息。
  *        @param  durable                 表示队列是否持久化。如果设置为非零值(例如1),表示队列是持久化的,即在服务器重启后仍然存在。如果设置为零值,则队列是非持久化的,不会存储到磁盘上。
  *        @param  exclusive             表示队列是否为独占队列。如果设置为非零值(例如1),表示队列是独占的,只有声明队列的连接可以访问。如果设置为零值,则队列可以被多个连接访问。
  *        @param  auto_delete             表示队列是否在不再使用时自动删除。如果设置为非零值(例如1),表示队列在没有连接使用时会被自动删除。如果设置为零值,则队列不会自动删除。
  *        @param  arguments             可选参数,用于传递额外的队列参数。
  */
  amqp_queue_declare(m_pConn,m_iChannel_Receive,_queue,_passive,_durable,_exclusive,_auto_delete,amqp_empty_table);  
  /**
  *        @brief  amqp_queue_bind()     用于将队列与交换机进行绑定。
  *        @param  conn                 与AMQP服务器建立的连接。
  *        @param  channel              通道编号,用于指定操作的特定通道。
  *        @param  queue                 要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  *        @param  exchange              要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  *        @param  routing_key             绑定的路由键,用于指定消息从交换机路由到队列的条件。消息的路由键和绑定的路由键匹配时,消息会被路由到该队列。
  *        @param  arguments             可选参数,用于传递额外的绑定参数。
  */
  amqp_queue_bind(m_pConn,m_iChannel_Receive,_queue,_exchange,_routekey,amqp_empty_table);
  /**
  *        @brief  amqp_basic_qos()  用来设置AMQP通道的QoS(Quality of Service)配置。
  *        @param  m_pConn                与AMQP服务器建立的连接。
  *        @param  m_iChannel            通道编号,用于指定操作的特定通道。
  *        @param  prefetch_size        预取大小,用于限制服务器一次性发送给消费者的消息的总大小。在这里,设置为0表示不限制消息的总大小。
  *        @param  prefetch_count        预取计数,用于限制服务器一次性发送给消费者的消息的数量。在这里,设置为1表示每次只获取一条消息。
  *        @param  global                用于指定是否将预取限制应用到整个通道(global=true)还是仅应用于特定消费者(global=false)。在这里,设置为0表示仅应用于特定消费者。
  */
  amqp_basic_qos(conn, KChannel, 0, /*prefetch_count*/1, 0);
  /**
  *        @brief  amqp_basic_ack()    用来确认(acknowledge)已经成功处理的消息。
  *        @param  m_pConn                与AMQP服务器建立的连接。
  *        @param  m_iChannel            通道编号,用于指定操作的特定通道。
  *        @param  delivery_tag  交付标识(delivery tag),它是一个标识特定消息的整数值。该标识用于唯一标识一条消息。
  *        @param  multiple            用于指示是否同时确认多条消息。设置为false表示只确认当前的一条消息,true为确认多条消息
  */
  amqp_basic_ack(m_pConn, m_iChannel, envelope.delivery_tag, true);
  /**
  *        @brief                        用于启动消费者并订阅消息队列中的消息。
  *        @param  conn                与AMQP服务器建立的连接。
  *        @param  channel             通道编号,用于指定操作的特定通道。
  *        @param  queue                要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  *        @param  consumer_tag        消费者标识,用于标识特定的消费者。可以为消费者指定一个唯一的标识,也可以使用空字符 (amqp_empty_bytes) 自动分配一个标识。
  *        @param  no_local            表示是否不接收自己发布的消息。如果设置为非零值(例如 1),表示不接收自己发布的消息。如果设置为零值,则接收自己发布的消息。
  *        @param  no_ack                表示是否不需要发送确认消息。如果设置为非零值(例如 1),表示不需要发送确认消息,即消息不需要进行显式的确认。如果设置为零值,则需要发送确认消息,以确保消息的可靠传递和处理。
  *        @param  exclusive            表示是否创建一个独占的消费者。如果设置为非零值(例如 1),表示创建一个独占的消费者,只有当前连接的消费者可以订阅指定队列。如果设置为零值,则可以有多个消费者订阅同一个队列。
  *        @param  arguments            可选参数,用于传递额外的订阅参数。
  */
  amqp_basic_consume(m_pConn,m_iChannel_Receive,queuename,amqp_empty_bytes,false,false,false,amqp_empty_table);
  /**
  *        @brief  用于从 AMQP 服务器接收订阅的消息,并将消息存储在 amqp_message_t 类型的结构体中,而不设置接收超时时间。
  *        @param  conn                与 AMQP 服务器建立的连接。
  *        @param  &envelope            指向 amqp_envelope_t 类型的指针,用于存储接收到的消息的包装信息(如交付标识、交换机名称等)。该结构体中的字段将被填充为接收到的消息的相关信息。
  *        @param  timeval*            如果输入nullptr,表示没有设置接收超时时间,即函数将一直阻塞等待接收到消息。
  *        @param  flags                0:表示没有设置特定的接收标志。
  */
  amqp_consume_message(m_pConn,&envelope,timeout,0);
3.3发送端函数
  /**
  *        @brief  amqp_exchange_declare() 用于向 AMQP 服务器发送一个交换机声明请求,以创建一个新的交换机或获取现有交换机的信息。
  *        @param  conn                 与AMQP服务器建立的连接。
  *        @param  channel              通道编号,用于指定操作的特定通道。
  *        @param  exchange              要声明的队列名称。可以是一个字符串,表示要创建或获取信息的队列名称。
  *        @param  type                 交换机的类型,可以是 "direct"、"fanout"、"topic"。
  *        @param  passive                 表示是否以被动模式进行声明。如果设置为非零值(例如1),表示以被动模式进行声明,仅返回现有队列的信息而不创建新队列。如果设置为零值,则会创建新队列或返回现有队列的信息。
  *        @param  durable                 表示队列是否持久化。如果设置为非零值(例如1),表示队列是持久化的,即在服务器重启后仍然存在。如果设置为零值,则队列是非持久化的,不会存储到磁盘上。
  *        @param  auto_delete             表示队列是否在不再使用时自动删除。如果设置为非零值(例如1),表示队列在没有连接使用时会被自动删除。如果设置为零值,则队列不会自动删除。
  *        @param  internal             表示交换机是否是内部的。如果设置为非零值(例如 1),表示交换机是内部的,只能通过其他交换机进行路由。如果设置为零值,则交换机可以直接接收消息。
  *        @param  arguments             可选参数,用于传递额外的队列参数。
  */
  amqp_exchange_declare(m_pConn,m_iChannel_Send,_exchange,_type,_passive,_durable,0,0,amqp_empty_table);
  /**
  *        @brief    用于将C风格字符串转换为AMQP字节序列的函数调用,函数的返回值是一个amqp_bytes_t类型的对象,其中包含转换后的字节序列的长度和指向数据的指针,这个的用途只是进行了一个转换,应该amqp库里的函数输入都不是char*以及string类型,都需要转成amqp_bytes_t类型
  *        @param  cstr                 const char*类型字符串
  */
  amqp_cstring_bytes("direct");
  /**
  *        @brief  将一个消息发布到 RabbitMQ 服务器.
  *        @param  conn                 已经打开的与 RabbitMQ 服务器的连接。
  *        @param  replyChannel         消息应该发布到的通道,这通常与消息原始接收的通道相同。  
  *        @param  exchange             交换器名字。
  *        @param  routekey             路由键。
  *        @param  mandatory             如果设置为 true,当消息不能被路由到任何队列时,服务器会将消息返回给生产者。如果是false,表示如果消息不能被路由,服务器会丢弃它。
  *        @param  immediate             如果设置为 true,当队列中没有消费者能立即接收到消息时,服务器会将消息返回给生产者。如果是false,表示服务器会将消息存储,直到有消费者能接收它。
  *        @param  properties*             消息的属性。
  *        @param  response_             要发布的消息体。
  */
  amqp_basic_publish(m_pConn,m_iChannel_Send,_exchange,_routekey,0,0,NULL,message_bytes);

四、参考网站

原文链接:https://blog.csdn.net/qq_43410878/article/details/123656765

原文链接:https://zhuanlan.zhihu.com/p/640374684

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/qq_43617638/article/details/136623964
版权归原作者 小码农想变强 所有, 如有侵权,请联系我们删除。

“RabbitMQ介绍+使用手册”的评论:

还没有评论