0


手把手教你SpringBoot集成消息服务中间件RabbitMQ

一,消息服务中间件的概述

1,大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。

2,消息服务中有两个概念:消息代理和目的地

当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

3,消息队列主要由两种形式的目的地。

​ (1)队列:点对点消息通信(一对一)

​ (2)主题:发布/订阅消息通信(一对多)

1,消息队列的应用场景:

一个网站,用户需要注册,注册后还需要发送注册邮件,还要发送注册短信,假如每一步都需要花费5秒的话,那么注册使用同步的方式,需要花费15秒,如果使用异步的方式注册,让注册邮件和注册短信同时进行,那么花费10秒。

那么问题来了,哪里用到了消息服务中间件呢?

用户注册信息之后,花费0.5秒的时间把信息写入消息中间件,然后提示用户注册成功,用户注册就只花费了5.5秒的时间。

写入消息中间件之后就不用管了,剩下的就交给消息队列自己执行了。

2,消息服务中间件的作用详述

应用解耦:

订单系统---------库存系统

订单系统每次减少一个库存,都要访问库存系统,告诉他已经卖了一单了,库存要减少一个。

订单系统----------消息队列-----------库存系统

订单系统减少一个库存时,写入消息队列,然后库存系统订阅该消息队列,得到订单系统写入的信息,然后库存系统自己减少一个库存。

流量削峰:

双十一那天会同时有超级多的用户同时抢购某一个东西,比如淘宝,那么如果特别多的用户同时访问服务器,服务器肯定是受不了的,解决办法就是把接到的请求订单信息写入消息队列,然后让服务器订阅该消息队列,自己取信息,如果请求数量超过了消息队列的上限,那么消息队列会拒收,所以减小了服务器的压力。

3,消息队列的模式

点对点式:

消息发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列。

消息只有唯一的发送者和接受者。

发布订阅式:

发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。

JMS,JAVA消息服务,基于JVM消息代理的规范,ActiveMQ,HornetMQ是JMS实现。

AMQP,高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现。

二,

RabbitMQ

的安装和使用

1,下载和安装

RabbitMQ

官网下载安装包:https://www.rabbitmq.com/

在安装

RabbitMQ

之前,首先需要安装

ErLang

包,因为

RabbitMQ

是基于

ErLang

语言的。

下载的

RabbitMQ

安装包和

RabbitMQ

安装包的版本要一致,否则环境会搭建失败,出现很多问题。

如果查看对应一致的版本?

RabbitMQ

官网有和

ErLang

版本对照表。

在这里插入图片描述

本次安装选择在

Linux

虚拟机进行安装,在安装之前要准备三个安装包。

ErLang的 rpm包

RabbitMQ是Erlang语言编写,所以Erang环境必须要有,注:Erlang环境一定要与RabbitMQ版本匹配:https://www.rabbitmq.com/which-erlang.html

Erlang下载地址:https://www.rabbitmq.com/releases/erlang/(根据自身需求及匹配关系,下载对应rpm包)

RabbitMQ的 rpm包

RabbitMQ下载地址:https://www.rabbitmq.com/releases/rabbitmq-server/(根据自身需求及匹配关系,下载对应rpm包)

socat的 rpm包

rabbitmq安装依赖于socat,所以需要下载socat。

socat下载地址:http://repo.iotti.biz/CentOS/6/x86_64/socat-1.7.3.2-1.el6.lux.x86_64.rpm

根据自身需求下载对应系统socat依赖:(http://repo.iotti.biz/CentOS/)

下载好之后的rpm包

在这里插入图片描述

打开

Linux

虚拟机,进入

/usr/local/software

文件夹,使用

xftp

工具把

rpm

安装包上传到该文件夹下。

在这里插入图片描述

安装顺序:1:erlang 2:socat(密钥) 3:rabbitmq

首先安装

erlang

rpm

安装包

[fanjiangfeng@localhost software]$ rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm 

注意一点:要切换到

root

用户才有权限安装,否则没有权限。

在这里插入图片描述

其次安装

socat

rpm

安装包(该密钥为

RabbitMQ

服务所依赖,如果不安装的话,

rabbitMQ

会安装失败)

[root@localhost software]# rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm 

最后安装

RabbitMQ

rpm

安装包

[root@localhost software]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm 

安装到此结束。

2,修改配置文件

安装完成后,

rabbitmq

会默认安装到

/usr/lib/rabbitmq/lib

路径下。

进入

ebin

目录,修改配置文件

rabbit.app

[root@localhost ebin]# pwd
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
vim rabbit.app

在这里插入图片描述

找到下面这个地方

在这里插入图片描述

修改为

在这里插入图片描述

RabbitMQ

安装就完成了。

3,命令行与管控台-基础操作

rabbitmqctl stop_app 关闭应用

rabbitmqctl start_app 开启应用

rabbitmqctl status 节点状态

rabbitmqctl add_user username password 添加用户

rabbitmqctl list_users 列出所有用户

rabbitmqctl delete_user usernmae 删除用户

rabbitmqctl clear_permissions -p vhostpath username 清除用户权限

rabbitmqctl add_vhost vhostpath 创建虚拟主机

rabbitmqctl list_vhosts 列出所有虚拟主机

rabbitmqctl list_permissions -p vhostpath 列出虚拟主机上所有权限

rabbitmqctl delete_vhost vhostpath 删除虚拟主机

rabbitmqctl list_queues 查看所有队列信息

rabbitmqctl -p vhostpath purge_queue blue 清除队列里的消息

4,命令行与管控台-高级操作

rabbitmqctl reset 移除所有数据,要在rabbitmqctl stop_app 之后使用

rabbitmqctl join_cluster [–ram] 组成集群命令

rabbitmqctl cluster_status 查看集群状态

5,来些基本操作

首先启动

rabbitmq

服务

[root@localhost software]# rabbitmqctl start_app

异常:Error: unable to connect to node rabbit@localhost: nodedown

如果出现上面的异常的话,解决方式如下:

systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
rabbitmqctl start_app
## 成功启动rabbitmq
## 启动neutrou-server,openstack恢复正常
systemctl start neutron-server.service

基本操作

[root@localhost software]# lsof -i:5672   查看rabbitmq是否启动(此为已启动的状态)
COMMAND  PID     USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
beam    6172 rabbitmq   48u  IPv6  73786      0t0  TCP *:amqp (LISTEN)
[root@localhost software]# rabbitmqctl list_queues  查看消息队列(空,因为刚安装的)
Listing queues ...
[root@localhost software]# rabbitmqctl list_vhosts  查看虚拟主机(空)
Listing vhosts ...
/
[root@localhost software]#

6,管控台

啥是管控台?

说白了,命令行是命令的方式进行管理,而管控台则是可视化视图的方式进行管理,当然管控台更方便了。只要是

rabbitmqctl

命令行有的,管控台全都有!

进入

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/sbin

的目录

[root@localhost sbin]# ls
rabbitmqctl  rabbitmq-defaults  rabbitmq-env  rabbitmq-plugins  rabbitmq-server

运行命令查看已安装的插件列表

[root@localhost sbin]# rabbitmq-plugins list
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@localhost
 |/
[  ] amqp_client                       3.6.5
[  ] cowboy                            1.0.3
[  ] cowlib                            1.0.1
[  ] mochiweb                          2.13.1
[  ] rabbitmq_amqp1_0                  3.6.5
[  ] rabbitmq_auth_backend_ldap        3.6.5

运行命令启动

RabbitMQ

的管控台

[root@localhost sbin]# rabbitmq-plugins enable rabbitmq_management

然后管控台就启动了,在浏览器的url地址栏输入虚拟机的 ip 和端口号为15672,就进入了管控台。

默认端口号就是15672。

在这里插入图片描述

账号密码默认为

guest

,登录

在这里插入图片描述

7,消息队列的说明

下图是一个队列。

在这里插入图片描述

上图队列说明:

1,第一个

msg

是要发送的消息;

2,

exchange.direct

是交换机,

direct

模式的交换机,它会规定一个

Routing Key

,如果队列的

Routing Key

和交换机指定的

Routing Key

一样,那么会把消息发送给该队列,对不上号的肯定就不会发送了。

3,

exchange.fanout

也是交换机,

fanout

模式的交换机,它不会规定

Routing Key

,类似广播模式,该交换机将发送消息到每一个队列中去。

4,中间绿色的当然是四个队列了。

5,

exchange.topic

负责从队列中取信息,它和队列的关系和交换机和队列的关系很像,都是相互绑定,但是不同的是,交换机是发送,而它是接收。

China.#和*.news是模糊匹配原则,绑定可以模糊匹配,可以对应多个队列。

三,springboot集成rabbitmq

前提是要在管控台添加消息队列完成之后才行。

1,管控台添加消息队列

第一步,先添加一个队列

在这里插入图片描述

第二步,添加一个交换机

在这里插入图片描述

第三步,添加一个消息接收者(

topic

模式的交换机)

在这里插入图片描述

第四步,绑定

direct

交换机和队列

在管控台点击该交换机,进入然后绑定队列。

在这里插入图片描述

第五步,绑定

topic

(接收者)交换机和队列,同上。

测试上面的消息队列是否创建成功?

点击交换机,发送消息

在这里插入图片描述

然后回到队列,查看消息

在这里插入图片描述

证明队列已经创建成功!

然后就可以落实到项目中去了!

2,springboot发送消息到队列

新建一个

springboot

项目,导入坐标

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

application.yml

配置文件中配置

rabbitmq

的主机地址

spring:rabbitmq:host: 192.168.186.128

在启动类上加上

RabbitMQ

的注解

@EnableRabbit
@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字  第二个参数:Routing Key的值  第三个参数:传递的消息对象
       rabbitTemplate.convertAndSend("test.direct","test","springboot发来的消息");}

运行测试类,回到

rabbitmq

的管控台,查看名为

test

的队列收到消息。

在这里插入图片描述

还可以发送一个对象,前提是该对象必须序列化,不然会抛异常。可是虽然能够发送成功,但拿到的是字节流,因此发送对象可以采用

json

串的方式。

对象中必须加入无参构造才能保证序列化成功,否则会序列化失败。

在这里插入图片描述

先写一个配置类,该配置类是规定使用

json

格式发送。

@ConfigurationpublicclassTestConfig{@BeanpublicJackson2JsonMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}
User

实体类

@DatapublicclassUserimplementsSerializable{privateString name;privateString address;publicUser(String name,String address){this.name = name;this.address = address;}publicUser(){}}

发送消息测试

@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字  第二个参数:Routing Key的值  第三个参数:传递的消息对象
     rabbitTemplate.convertAndSend("test.direct","test",newUser("樊江锋","河南郑州"));}

查看管控台,已经拿到了该

json

格式的对象

3,springboot从队列中接收消息

@Testvoidreceive(){//指定队列名Object test = rabbitTemplate.receiveAndConvert("test");System.err.println(test);}

4,springboot监听接收到的消息

当监听到队列中收到消息时,然后可以做一些处理。下面的监听在项目启动之后会开启,前提是启动类要加

@EnableRabbit

注解。

//监听接收到的数据(请求体)@RabbitListener(queues ="test")publicvoidreceive(User user){System.out.println("收到消息:"+user);}//监听接收到的请求头@RabbitListener(queues ="test")publicvoidreceive2(Message message){System.out.println(message.getBody());System.out.println(message.getMessageProperties());}

测试一下监听,给

test

队列发送一条消息。

@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字  第二个参数:Routing Key的值  第三个参数:传递的消息对象
    rabbitTemplate.convertAndSend("test.direct","test",newUser("樊江锋","河南郑州"));}

然后看控制台,会打印

收到消息:User{name=‘樊江锋’, address=‘河南郑州’}

而且控制台队列显示未读消息是0,显然已经删除了。

5,amqp管理组件

amqp

是用来管理组件的,上面的组件(交换机,队列,以及交换机和队列之间的绑定)都是在管控台操作的,这里使用

amqp

来管理。

@AutowiredAmqpAdmin amqpAdmin;voidcreate(){
        amqpAdmin.declareExchange(newDirectExchange("test"));//创建一个名为test的direct模式的交换机
        amqpAdmin.declareQueue(newQueue("test",true));//创建一个名为test的队列//第一个参数:绑定的队列名//第二个参数:绑定的类型(选择队列)//第三个参数:交换机名//第四个参数:Routing Key的值//第五个参数:需要传的参数,这里不需要
        amqpAdmin.declareBinding(newBinding("test",Binding.DestinationType.QUEUE,"topic_test","routingkey",null));
        amqpAdmin.deleteExchange("test");//删除交换机(参数:交换机名)
        amqpAdmin.deleteQueue("test");//删除队列(参数:队列名)}

到这里结束!


本文转载自: https://blog.csdn.net/weixin_44001965/article/details/143613765
版权归原作者 刻苦的樊同学 所有, 如有侵权,请联系我们删除。

“手把手教你SpringBoot集成消息服务中间件RabbitMQ”的评论:

还没有评论