文章目录
1. RabbitMQ 介绍
引用 :
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,
你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,
一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
1.1 几个重要概念
- 消息(Message):消息是 RabbitMQ 中最基本的数据单元,它由消息体和可选的属性组成。消息体是要传递的实际数据,属性可以包含一些元数据,如路由键、优先级等。
- 生产者(Producer):生产者是发送消息到 RabbitMQ 的应用程序。它将消息发布到交换器(Exchange),并指定一个路由键(Routing Key)来标识消息的目的地。 --> 可以直接认为是生产消息的人 (厂家)
- 交换器(Exchange):交换器是接收来自生产者的消息,并根据路由规则将消息路由到一个或多个队列。它定义了消息从生产者到队列的路由策略。
- 队列(Queue):队列是存储消息的地方,它是消费者从中接收消息的对象。队列具有名称,并且可以设置各种属性,如持久性、优先级等。
- 绑定(Binding):绑定是将交换器和队列关联起来的过程。它指定了一个交换器和一个队列之间的关系,通常包括交换器名称、队列名称和路由键。
- 消费者(Consumer):消费者是从队列中获取消息的应用程序。它订阅一个或多个队列,并从队列中接收、处理消息。
- 路由键(Routing Key):路由键是生产者在将消息发送到交换器时指定的一个关键字。交换器根据路由键的值将消息路由到一个或多个队列。
- 绑定键(Binding Key):绑定键是用来绑定交换器和队列之间关系的关键字。当交换器接收到消息时,会根据消息的路由键和绑定键来进行匹配,从而确定将消息发送到哪个队列。
- 消息确认(Message Acknowledgement):消息确认机制用于确保消息在消费者处理后得到正确处理。消费者在处理完消息后,通过发送确认给 RabbitMQ,告知它该消息已被成功处理。
- 可靠性(Reliability):RabbitMQ 提供了一些可靠性机制,如持久化消息、持久化队列和交换器,以保证消息不会丢失,并在发生故障时恢复数据。
1.2 RabbitMq 的工作原理
简单 看完上面的几个概念 , 这里在来简单了解一下 RabbitMq 的工作原理
图一 :
图二 :
图三 :
2 RabbitMQ 安装
环境: Linux 的 CentOS 7
RabbitMQ 安装地址
RabbitMq
和
Erlang
版本对照 : RabbitMQ Erlang Version Requirements — RabbitMQ
找到了自己要下载的 Erlang 版本 下面就来下载 : rabbitmq/erlang - Packages · packagecloud
安装包有了,下面我们就来安装 , 这里先将 这两个安装包传输到 Linux 服务器 上 .
这里我传输用到的工具是
xftp
图一 :
图二 :
安装包传输完成后 就来安装 Erlang
进入到
/usr/local/rabbitmq
目录 解压缩 Erlang
cd /usr/local/rabbitmq
rpm-ivh erlang-23.2.7-2.el7.x86_64.rpm
注意 : 这里如果使用 没有权限的用户 安装是 安装不下来的 需要 像 root 用户 申请权限 ,
使用 sudorpm-ivh erlang-23.2.7-2.el7.x86_64.rpm 即可 --> 这里会出现让你输入 管理员密码输入即可
命令解释 :
sudo: sudo 是一个用于以超级用户(root)身份执行命令的命令。在执行需要管理员权限的操作时,使用 sudo 可以提升当前用户的权限。
rpm: rpm 是 Linux 中一个常用的包管理器,用于管理安装、升级、查询和删除软件包。它用于执行与 RPM 软件包相关的操作。
-ivh: 这是 rpm 命令的选项参数。具体含义如下:
-i: 安装指定的 RPM 软件包。
-v: 在执行安装过程时显示详细的输出信息。
-h: 在显示进度条时使用哈希字符(#),提供更直观的进度显示。
结果 :[root@master rabbitmq]# rpm -ivh erlang-23.2.7-2.el7.x86_64.rpm
警告:erlang-23.2.7-2.el7.x86_64.rpm: 头V4 RSA/SHA1 Signature, 密钥 ID 6026dfca: NOKEY
准备中... ################################# [100%]
正在升级/安装...
1:erlang-23.2.7-2.el7 ################################# [100%]
安装完后 可以输入 erl -v , 如果出现 erlang 的版本好就说明安装成功了
结果 :[root@localhost rabbitmq]# erl -v
Erlang/OTP 23[erts-11.1.8][source][64-bit][smp:4:4][ds:4:4:10][async-threads:1][hipe]
Eshell V11.1.8 (abort with ^G)1>
到此
Erlang
就安装完了,下面我们来安装
RabbitMq
输入: rpm-ivh rabbitmq-server-3.8.35-1.el8.noarch.rpm
同理如果没有权限 可以 在前面 加上 sudo , 如果嫌弃每次加上 sudo 麻烦 可以直接使用 sudosu 直接切换到 超级用户(root)
过程 :[root@master rabbitmq]# rpm -ivh rabbitmq-server-3.8.35-1.el8.noarch.rpm
警告:rabbitmq-server-3.8.35-1.el8.noarch.rpm: 头V4 RSA/SHA256 Signature, 密钥 ID 6026dfca: NOKEY
准备中... ################################# [100%]
正在升级/安装...
1:rabbitmq-server-3.8.35-1.el8 ################################# [100%]
安装完 RabbitMQ 后我们就可以启动 RabbitMQ
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
在未启动 rabbitmq时 使用
systemctl status rabbitmq-server
命令查看服务状态
[root@localhost rabbitmq]# systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: inactive (dead)# 可以看到这里显示 dead 说明是 死的 即没有启动服务9月 2017:02:42 localhost.localdomain rabbitmq-server[3006]: /var/log/rabbitmq/rabbit@localhost_upgrade.log
9月 2017:02:42 localhost.localdomain rabbitmq-server[3006]: Config file(s): (none)9月 2017:02:44 localhost.localdomain rabbitmq-server[3006]: Starting broker... completed with 4 plugins.
9月 2017:02:44 localhost.localdomain systemd[1]: Started RabbitMQ broker.
9月 2022:34:02 localhost.localdomain systemd[1]: Stopping RabbitMQ broker...
9月 2022:34:02 localhost.localdomain rabbitmqctl[11894]: Shutting down RabbitMQ node rabbit@localhost running at PID 30069月 2022:34:02 localhost.localdomain rabbitmq-server[3006]: Gracefully halting Erlang VM
9月 2022:34:02 localhost.localdomain rabbitmqctl[11894]: Waiting for PID 3006 to terminate
9月 2022:34:07 localhost.localdomain rabbitmqctl[11894]: RabbitMQ node rabbit@localhost running at PID 3006 successfully shut down
9月 2022:34:08 localhost.localdomain systemd[1]: Stopped RabbitMQ broker.
[root@localhost rabbitmq]#
启动 rabbitmq ,后再 查看 服务状态
[root@localhost rabbitmq] systemctl start rabbitmq-server # 启动 rabbitmq 服务 [root@localhost rabbitmq][root@localhost rabbitmq][root@localhost rabbitmq] systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 三 2023-09-20 22:36:29 CST; 7s ago # 可以看到这里的状态显示 running 说明正在运行
Main PID: 11976(beam.smp)
Status: "Initialized"
Tasks: 28
CGroup: /system.slice/rabbitmq-server.service
├─11976 /usr/lib64/erlang/erts-11.1.8/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs512-MHlmbcs512-MMmcs30-P1048576-t5...
├─11994 erl_child_setup 32768
├─12021 /usr/lib64/erlang/erts-11.1.8/bin/epmd -daemon
├─12046 inet_gethost 4
└─12047 inet_gethost 49月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: TLS Library: OpenSSL - OpenSSL 1.0.2k-fips 26 Jan 20179月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: Doc guides: https://rabbitmq.com/documentation.html
9月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: Support: https://rabbitmq.com/contact.html
9月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: Tutorials: https://rabbitmq.com/getstarted.html
9月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: Monitoring: https://rabbitmq.com/monitoring.html
9月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: Logs: /var/log/rabbitmq/[email protected]
9月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: /var/log/rabbitmq/rabbit@localhost_upgrade.log
9月 2022:36:27 localhost.localdomain rabbitmq-server[11976]: Config file(s): (none)9月 2022:36:29 localhost.localdomain rabbitmq-server[11976]: Starting broker... completed with 4 plugins.
9月 2022:36:29 localhost.localdomain systemd[1]: Started RabbitMQ broker.
[root@localhost rabbitmq]
关于
RabbitMq
的其他命令
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server
这里关于 RabbitMq的几个命令看完,下面我们来安装一下 RabbitMq 为我们提供的一个管理端页面 .
安装客户端软件
rabbitmq-plugins enable rabbitmq_management
重启RabbitMq 服务
systemctl restart rabbitmq-server
注意 : 这里开放 15672 端口号 ,因为RabbitMq 默认的访客端口号为 15672
如果开放呢可以采取关闭防火墙的方式 ,如果关闭防火墙
$ sudo systemctl stop firewalld # 停止 firewalld 服务
$ sudo systemctl disable firewalld # 禁止开机启动 firewalld
这里可以通过 $ sudo firewall-cmd --state 命令查看 防火墙是否真的关闭 , 如果输出结果为 : no running 则说明 防火墙已成功停止
安装完管理界面,我们还需要配置一个管理员用户 ,要不然无法登录管理页面 , 无法进行管理操作,无法查看系统状态 ,另外 rabbitmq 有一个默认的账号密码
guest
, 用这个登录 会出现权限问题 (仅限于本机进行访问也就是rabbitmq服务器所在的机器) .
**这里想要打开rabbitmq 的管理页面在搜索框中输入 :
http://rabbitmq所在的ip地址:15672
即可**
创建管理员用户 :
# 创建账号和密码
rabbitmqctl add_user 用户名 密码
# 设置用户角色
rabbitmqctl set_user_tags 用户名 角色
四种角色 :
administrator:可以登录控制台、查看所有信息、并对rabbitmq进行管理
monToring:监控者;登录控制台,查看所有信息
policymaker:策略制定者;登录控制台指定策略
managment:普通管理员;登录控制
# 为用户添加资源权限,添加配置、写、读权限# set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p"/" y(将 y 替换成我们自己创建的用户)".*"".*"".*"
// Make sure to add code blocks to your code group
图:
到此关于rabbitmq 环境 安装就安装完成了,下面我们来学习一下 rabbitmq 的相关操作.
3. RabbitMQ 入门操作
这里我们通过 写一个 Hello World 的程序,来入门 RabbitMQ .
大致操作: 使用 java 创建 生产者和消费者 ,通过生产者生产者生产消息 发送到 Rabbitmq 服务器 , 然后让 消费者 去 rabbitmq 服务器拿取消息 进行消费.
特别注意 : 这里 java 连接 rabbitmq 时需要 Linux 开放 5672 端口 , 否者会超时 , 之前 web界面的 端口号是 15672.
简单看完流程 : 下面就来进行编码
3.1 添加依赖
这里先创建一个 Maven 程序 ,在 pom.xml 添加下面的依赖
<dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>
3.2 生产者代码
创建一个类代表生产者,生产消息并发送到 rabbitmq 服务器的队列中
packageorg.example.one;// 生产者importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importjavafx.util.Callback;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{// 队列名称publicstaticfinalStringQUEUE_NAME="mu";// 生产消息并发送到 rabbitmq 上publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 1. 创建一个连接工厂ConnectionFactory factory =newConnectionFactory();// 2. 指定 rabbitmq 服务 所在的 ip 和 管理员用户名 和密码// ip
factory.setHost("192.168.80.129");// 用户名
factory.setUsername("mu");// 密码
factory.setPassword("1234");// 3. 创建连接 -- newConnection 方法需要抛出异常Connection connection = factory.newConnection();// 4. 获取信道 : 使用信道可以减少网络开销、提高吞吐量,并提供灵活的流量控制,从而改善 RabbitMQ 的性能和可扩展性。虽然每个信道都会增加一些内存和CPU的开销,// 但与为每条消息建立新连接相比,整体系统的开销会更小。因此,在大多数情况下,使用信道是更合适和有效的做法。Channel channel = connection.createChannel();/**
* 生产出一个队列 , 使用 queueDeclare 方法
* 主要参数:
* 1. queue:要创建或声明的队列名称。如果没有指定任何名称,则系统将随机生成一个唯一的名称并返回。
* 如果已有同名队列存在,则使用该队列的其他参数(如持久化、排他性等)。
* 2. durable:如果为 True,则表示该队列是持久化的。这意味着在 RabbitMQ Broker 重启后,该队列和其中的消息仍将存在。
* 3. exclusive:如果为 True,则表示该队列是排他的。这意味着该队列只能由申明它的连接访问,其他连接不能访问。当连接关闭后,该队列也将被删除。
* 4. auto_delete:如果为 True,则表示该队列是自动删除的。这意味着当该队列没有任何消费者时,自动删除该队列。
* 5. arguments:可选参数列表,用于设置队列的特殊参数。
*///5. 这里设置了 队列名称 , 不持久化 , 消息不共享,只让一个消费者消费 , 无其他参数
channel.queueDeclare(QUEUE_NAME,false,false,false,null);//6. 创建消息String message ="Hello World";/**
* 7. 发送消息 需要使用到 basicPublish方法
* 1.发送到哪个交换机
* 2.路由的key值是哪个本次是队列的名称
* 3.其他参数信息
* 4.发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());// 到此 消息发送完成System.out.println("生产者生产完消息并发送");}}
执行生产者代码后我们可以在 rabbitmq 管理页面看到 消息存放的 队列
3.3 消费者代码
创建一个类来表示消费者 ,从 rabbitmq 的队列中读取消息 ,进行消费
这里先来列举出即将要使用到的方法 :
通过 basicConsume 方法接受消息 , 这里 basicConsume 方法 第三个和第四个参数都是接口,所以需要实现该接口的方法
channel.basicConsume(队列名字/String, 是否自动签收/boolean, 消费时的回调/接口类, 无法消费的回调/接口类);
简单介绍一下参数:1.消费哪个队列
2.消费成功之后是否要自动应答true:代表自动应答false:代表手动应答
3.消费者未成功消费的回调
4.消费者取消消费的回调
了解完要用到的 api 后 ,我们来写 消费者代码
packageorg.example.one;// 消费者importcom.rabbitmq.client.*;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.TimeoutException;publicclassConsumer{publicstaticfinalStringQUEUE_NAME="mu";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 1. 创建一个连接工厂ConnectionFactory factory =newConnectionFactory();// 2. 指定 rabbitmq 服务 所在的 ip 和 管理员用户名 和密码// ip
factory.setHost("192.168.80.129");// 用户名
factory.setUsername("mu");// 密码
factory.setPassword("1234");// 3. 创建连接 -- newConnection 方法需要抛出异常Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 4. 声明接收消息的回调DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("接收到消息, 消息为: "+newString(message.getBody(),StandardCharsets.UTF_8));};//50 声明 取消消费消息的回调CancelCallback cancelCallback = consumerTag ->{System.out.println("消息消费被中断");};/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答true:代表自动应答false:代表手动应答
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}
到此消费者的代码就写完了,运行一下看看效果
入门案例看完我们就来学一下 Work Queues (任务队列)
4. Work Queues
引用:
Work Queues 是工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
这里抛出一个 Work Queues 是为了说明 rabbitmq 是通过轮询消费的方式 来处理消息的 , 当生产者生产了很多消息,并发送到工作队列中的消息队列中 ,多个消费者 会按顺序的方式依次去队列中取消息进行消费 .
图:
代码案例 :
上面我们写 消费者和生产者的代码 会发现有一些代码是重复的,我们可以把这些重复的代码抽出来封装成一个类
重复的代码:// 1. 创建一个连接工厂ConnectionFactory factory =newConnectionFactory();// 2. 指定 rabbitmq 服务 所在的 ip 和 管理员用户名 和密码// ip
factory.setHost("192.168.80.129");// 用户名
factory.setUsername("mu");// 密码
factory.setPassword("1234");// 3. 创建连接 -- newConnection 方法需要抛出异常Connection connection = factory.newConnection();Channel channel = connection.createChannel();
封装到一个类中, 下次写道一样的代码直接调用方法即可
publicclassRabbitMQUtils{publicstaticChannelgetChannel()throwsIOException,TimeoutException{// 1. 创建一个连接工厂ConnectionFactory factory =newConnectionFactory();// 2. 指定 rabbitmq 服务 所在的 ip 和 管理员用户名 和密码// ip
factory.setHost("192.168.80.129");// 用户名
factory.setUsername("mu");// 密码
factory.setPassword("1234");// 3. 创建连接 -- newConnection 方法需要抛出异常Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}}
封装完 RabbitMQUtils ,下面就来完成 轮询的代码案例 , 这里先来创建两个消费者
这里的代码与之前的 消费者差不多,差别就在几个 打印语句
packageorg.example.two;importcom.rabbitmq.client.CancelCallback;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importorg.example.utils.RabbitMQUtils;importjava.io.IOException;importjava.nio.charset.StandardCharsets;importjava.util.concurrent.TimeoutException;publicclassWork1{// 队列名publicstaticfinalStringQUEUE_NAME="mu";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{Channel channel =RabbitMQUtils.getChannel();/**
* DeliverCallback 的两个参数
* 1. deliveryTag 参数:deliveryTag 是一个唯一标识符,用于标识和跟踪消息的传递状态。
* 每个消息都会被分配一个唯一的 deliveryTag。在消息确认时,可以使用 deliveryTag 来指定确认的是哪条消息。
*
* 2. message 参数:message 是一个 AMQP 消息对象,包含从队列中获取的消息内容以及相应的属性信息。
* 通过 message 参数,可以访问消息的内容、标签、路由 key 等相关属性。可以根据需要对消息进行解析和处理。
*/// 接受消息的回调DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("消费者1 接收到消息: "+newString(message.getBody(),StandardCharsets.UTF_8));};// 取消消费消息CancelCallback cancelCallback = consumerTag ->{System.out.println(consumerTag +"消费者1 取消消费消息");};// 消息的接受
channel.basicConsume(QUEUE_NAME,true, deliverCallback, cancelCallback);}}
这里消费2 和消费者1 类似 ,区别在于 接受消息和取消接受消息的 打印语句 复制一份消费者1 即可
消费者就绪,下面来写一个生产者来大量生产消息
packageorg.example.two;importcom.rabbitmq.client.Channel;importorg.example.utils.RabbitMQUtils;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{//队列的名称publicstaticfinalStringQUEUE_NAME="mu";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{// 获取信道Channel channel =RabbitMQUtils.getChannel();// 队列的声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 发送消息for(int i =1; i <11; i++){String message ="——— "+ i +" ———";// 发送消息
channel.basicPublish("",QUEUE_NAME,null, message.getBytes());}}}
生产者消费者都就绪了,下面启动来看看效果:
轮询案例看完,下面来学习一下如何通过 rabbitmq 管理端网页 创建队列
5. 管理端页面创建队列
版权归原作者 牧.. 所有, 如有侵权,请联系我们删除。