老样子,咱八股文说完了,上点干货,来点实际的操作(不懂的先去把上章 (RabbitMQ(一) 啃了)
我目前在做的项目是用springboot自带的amqp创建的RabbitMQ坐标 (如果不需要的可以去查一查RabbitMQ自身坐标进行引入) :
创建步骤(new project 选择springInitializr => 它能帮我们快速创建springboot项目)
![](https://img-blog.csdnimg.cn/direct/73426c2d8ab24a9799f3bed899417b2c.png)
创建的时候选好自己要的版本(我这里选的是JDK17,用的是IDEA2023.3.1版本,为了赶紧给大家制作出来,就没有去换低版本使用JDK8了,2023.3.1还没找到在哪配8的版本)
我用的是maven,所以此处以maven为例
做完这两项后创建工程,得到maven坐标
至于搭环境啥的我就不详细说了,有兴趣的兄弟姐妹可以搜来自己搞搞,如果哪家公司一进去就要你搭这玩意儿的服务,那听兄弟的,除非你真是大佬,不然赶紧跑,这公司干不长
配置方面我只提一句yaml的配置
注意:此处的yaml配置是spring amqp的配置,RabbitMQ的端口有很多个,主要端口有四个:
4369 – erlang发现口
5672 --client端通信口
15672 – 管理界面ui端口
25672 – server间内部通信口
4369:epmd,RabbitMQ节点和CLI工具使用的对等发现服务
5672、5671:由不带TLS和带TLS的AMQP 0-9-1和1.0客户端使用
25672:用于节点间和CLI工具通信(Erlang分发服务器端口),并从动态范围分配(默认情况下限制为单个端口,计算为AMQP端口+ 20000)。
除非确实需要这些端口上的外部连接(例如,群集使用联合身份验证或在子网外部的计算机上使用CLI工具),否则这些端口不应公开。
有关详细信息,请参见网络指南。
35672-35682:由CLI工具(Erlang分发客户端端口)用于与节点进行通信,并从动态范围分配(通过服务器分发端口+ 10010计算为服务器分发端口+ 10000)。
有关详细信息,请参见网络指南。
15672:HTTP API客户端,管理UI和Rabbitmqadmin (仅在启用管理插件的情况下)
61613、61614:不带TLS和带TLS的STOMP客户端(仅在启用STOMP插件的情况下)
1883、8883 :(不带和带有TLS的MQTT客户端,如果启用了MQTT插件
15674:STOMP-over-WebSockets客户端(仅在启用了Web STOMP插件的情况下)
15675:MQTT-over-WebSockets客户端(仅在启用Web MQTT插件的情况下)
15692:Prometheus指标(仅在启用Prometheus插件的情况下)
所以,在配置完成后想查看RabbitMQ的管理界面,我们应该访问15672端口(也就是yaml中的host:15672)
进入这个页面以后,输入用户名和密码,也就是上面截图yml中配置的用户名和密码
进入这个页面,就算OK了
咱们先说说连接
Virtual host: 所属的虚拟主机。
Name: 名称。
User name: 使用的用户名。
State: 当前的状态,running:运行中;idle:空闲。
SSL/TLS: 是否使用ssl进行连接。
Protocol: 使用的协议。
Channels: 创建的channel的总数。
From client: 每秒发出的数据包。
To client: 每秒收到的数据包。
也可点击主页面的channel进入查看所有通道
channel: 名称。
Virtual host: 所属的虚拟主机。
User name: 使用的用户名。
Mode: 渠道保证模式。 可以是以下之一,或者不是:C: confirm。T:transactional(事务)。
State : 当前的状态,running:运行中;idle:空闲。
Unconfirmed: 待confirm的消息总数。
Prefetch: 设置的prefetch的个数。
Unacker: 待ack的消息总数。
publish: producter pub消息的速率。
confirm: producter confirm消息的速率。
deliver/get: consumer 获取消息的速率。
ack: consumer ack消息的速率。
点击Exchanges(交换机)
说明一下features的几个参数
D: 是 durable 的缩写,代表这个队列中的消息支持持久化
I: 是 internal 的缩写,表示这个exchange不可以被client用来推送消息,仅用来进行exchange之间的绑定。
Args: 是 arguments 的缩写。代表该队列配置了 arguments 参数。
TTL: 是 x-message-ttl 的缩写。设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒。
DLX: 说明该队列配置了 x-dead-letter-exchange。当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉。
DLK: x-dead-letter-routing-key 的缩写,将删除的消息推送到指定交换机的指定路由键的队列中去。
随便点击一个交换机(这里以platform为结尾的交换机为例)
上述红框中的东西便是交换机绑定的队列
点击Queues(队列)
队列的属性
Virtual host: 所属的虚拟主机。
Name: 名称。
Features: 功能。(参数交换机参数)
State: 当前的状态,running:运行中;idle:空闲。
Ready: 当前队列中等待被消费的消息数量(这些消息已经被投递到队列中,但还没有被消费者取走。Ready 数量反映了当前队列中的消息积压情况,即有多少消息处于等待状态,等待被消费者处理)
Unacked: 未被消费者确认的消息数量(在消息队列中,当消费者接收到消息后,需要进行确认(acknowledge),表示已经处理了该消息。如果消息在一定时间内没有被确认,RabbitMQ 会将其重新放回队列,以便其他消费者重新处理)
Total: 总数 Ready+Unacked。
incoming: 消息进入的速率。
deliver/get: 消息获取的速率。
ack: 消息应答的速率。
再说一下admin界面
用户属性
Name: 名称。
Tags: 角色标签,只能选取一个。
Can access virtual hosts: 允许进入的vhost。
Has password: 设置了密码
好了,重点来了,环境有了,在项目中怎么编写代码并查看呢,咱们以platforms为例子
创建一个配置类(用于配置交换机、路由、队列信息)
参数1:队列名称(此处我是声明成了一个常量名,公司一般有一个专门存常量名的类,不知道这个类的问一问你老大哥在哪,去里面写去)
(没错,这个就是之前咱在RabbitMQ里面看到的那个platforms)
参数2:服务器重新启动后是否继续存在
参数3:用于声明一个独占队列(当
exclusive
参数设置为
true
时,表示该队列将只能被声明它的连接使用)
参数4:如果服务器应在队列不再使用时删除该队列,则为true,反之为false
参数5:用于声明队列的额外参数
设置完这些后,我们就可以开始编写业务代码了
(核心代码: convertAndSend => 发送数据)
再来到消费者这边
问题来了,执行完这些以后,去哪拿消费完的数据呢
当然是开个接口给前端取数据啦
到此为止整个mq的业务逻辑就结束了,接下来我们说几个对遇到的问题以及解决的小妙招
当队列有数据未消费、执行时间又过长,且后台没有报错时,我们无法确定数据到底是怎么个事儿
这个时候我们该怎么测出RabbitMQ消费者是否出错呢
这样代码出错时就可以让后台报错了,咱们就可以着手解决代码问题了
好了,RabbitMQ的使用暂时就先说到这儿,如果有不对的地方请各位正在给阿帕奇贡献的大佬们留言指证,对RabbitMQ还有补充的话我会单开一章继续补充,散会
版权归原作者 monkey joker 所有, 如有侵权,请联系我们删除。