0


手把手教你SpringBoot集成消息服务中间件RabbitMQ

一,消息服务中间件的概述

1,大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。

2,消息服务中有两个概念:消息代理和目的地

当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

3,消息队列主要由两种形式的目的地。

​ (1)队列:点对点消息通信(一对一)

​ (2)主题:发布/订阅消息通信(一对多)

1,消息队列的应用场景:

一个网站,用户需要注册,注册后还需要发送注册邮件,还要发送注册短信,假如每一步都需要花费5秒的话,那么注册使用同步的方式,需要花费15秒,如果使用异步的方式注册,让注册邮件和注册短信同时进行,那么花费10秒。

那么问题来了,哪里用到了消息服务中间件呢?

用户注册信息之后,花费0.5秒的时间把信息写入消息中间件,然后提示用户注册成功,用户注册就只花费了5.5秒的时间。

写入消息中间件之后就不用管了,剩下的就交给消息队列自己执行了。

2,消息服务中间件的作用详述

应用解耦:

订单系统---------库存系统

订单系统每次减少一个库存,都要访问库存系统,告诉他已经卖了一单了,库存要减少一个。

订单系统----------消息队列-----------库存系统

订单系统减少一个库存时,写入消息队列,然后库存系统订阅该消息队列,得到订单系统写入的信息,然后库存系统自己减少一个库存。

流量削峰:

双十一那天会同时有超级多的用户同时抢购某一个东西,比如淘宝,那么如果特别多的用户同时访问服务器,服务器肯定是受不了的,解决办法就是把接到的请求订单信息写入消息队列,然后让服务器订阅该消息队列,自己取信息,如果请求数量超过了消息队列的上限,那么消息队列会拒收,所以减小了服务器的压力。

3,消息队列的模式

点对点式:

消息发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列。

消息只有唯一的发送者和接受者。

发布订阅式:

发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。

JMS,JAVA消息服务,基于JVM消息代理的规范,ActiveMQ,HornetMQ是JMS实现。

AMQP,高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现。

二,

  1. RabbitMQ

的安装和使用

1,下载和安装

  1. RabbitMQ

官网下载安装包:https://www.rabbitmq.com/

在安装

  1. RabbitMQ

之前,首先需要安装

  1. ErLang

包,因为

  1. RabbitMQ

是基于

  1. ErLang

语言的。

下载的

  1. RabbitMQ

安装包和

  1. RabbitMQ

安装包的版本要一致,否则环境会搭建失败,出现很多问题。

如果查看对应一致的版本?

  1. RabbitMQ

官网有和

  1. ErLang

版本对照表。

在这里插入图片描述

本次安装选择在

  1. Linux

虚拟机进行安装,在安装之前要准备三个安装包。

ErLang的 rpm包

RabbitMQ是Erlang语言编写,所以Erang环境必须要有,注:Erlang环境一定要与RabbitMQ版本匹配:https://www.rabbitmq.com/which-erlang.html

Erlang下载地址:https://www.rabbitmq.com/releases/erlang/(根据自身需求及匹配关系,下载对应rpm包)

RabbitMQ的 rpm包

RabbitMQ下载地址:https://www.rabbitmq.com/releases/rabbitmq-server/(根据自身需求及匹配关系,下载对应rpm包)

socat的 rpm包

rabbitmq安装依赖于socat,所以需要下载socat。

socat下载地址:http://repo.iotti.biz/CentOS/6/x86_64/socat-1.7.3.2-1.el6.lux.x86_64.rpm

根据自身需求下载对应系统socat依赖:(http://repo.iotti.biz/CentOS/)

下载好之后的rpm包

在这里插入图片描述

打开

  1. Linux

虚拟机,进入

  1. /usr/local/software

文件夹,使用

  1. xftp

工具把

  1. rpm

安装包上传到该文件夹下。

在这里插入图片描述

安装顺序:1:erlang 2:socat(密钥) 3:rabbitmq

首先安装

  1. erlang

  1. rpm

安装包

  1. [fanjiangfeng@localhost software]$ rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

注意一点:要切换到

  1. root

用户才有权限安装,否则没有权限。

在这里插入图片描述

其次安装

  1. socat

  1. rpm

安装包(该密钥为

  1. RabbitMQ

服务所依赖,如果不安装的话,

  1. rabbitMQ

会安装失败)

  1. [root@localhost software]# rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm

最后安装

  1. RabbitMQ

  1. rpm

安装包

  1. [root@localhost software]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安装到此结束。

2,修改配置文件

安装完成后,

  1. rabbitmq

会默认安装到

  1. /usr/lib/rabbitmq/lib

路径下。

进入

  1. ebin

目录,修改配置文件

  1. rabbit.app

  1. [root@localhost ebin]# pwd
  2. /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
  1. vim rabbit.app

在这里插入图片描述

找到下面这个地方

在这里插入图片描述

修改为

在这里插入图片描述

  1. RabbitMQ

安装就完成了。

3,命令行与管控台-基础操作

rabbitmqctl stop_app 关闭应用

rabbitmqctl start_app 开启应用

rabbitmqctl status 节点状态

rabbitmqctl add_user username password 添加用户

rabbitmqctl list_users 列出所有用户

rabbitmqctl delete_user usernmae 删除用户

rabbitmqctl clear_permissions -p vhostpath username 清除用户权限

rabbitmqctl add_vhost vhostpath 创建虚拟主机

rabbitmqctl list_vhosts 列出所有虚拟主机

rabbitmqctl list_permissions -p vhostpath 列出虚拟主机上所有权限

rabbitmqctl delete_vhost vhostpath 删除虚拟主机

rabbitmqctl list_queues 查看所有队列信息

rabbitmqctl -p vhostpath purge_queue blue 清除队列里的消息

4,命令行与管控台-高级操作

rabbitmqctl reset 移除所有数据,要在rabbitmqctl stop_app 之后使用

rabbitmqctl join_cluster [–ram] 组成集群命令

rabbitmqctl cluster_status 查看集群状态

5,来些基本操作

首先启动

  1. rabbitmq

服务

  1. [root@localhost software]# rabbitmqctl start_app

异常:Error: unable to connect to node rabbit@localhost: nodedown

如果出现上面的异常的话,解决方式如下:

  1. systemctl enable rabbitmq-server.service
  2. systemctl start rabbitmq-server.service
  3. rabbitmqctl start_app
  4. ## 成功启动rabbitmq
  5. ## 启动neutrou-server,openstack恢复正常
  6. systemctl start neutron-server.service

基本操作

  1. [root@localhost software]# lsof -i:5672 查看rabbitmq是否启动(此为已启动的状态)
  2. COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
  3. beam 6172 rabbitmq 48u IPv6 73786 0t0 TCP *:amqp (LISTEN)
  4. [root@localhost software]# rabbitmqctl list_queues 查看消息队列(空,因为刚安装的)
  5. Listing queues ...
  6. [root@localhost software]# rabbitmqctl list_vhosts 查看虚拟主机(空)
  7. Listing vhosts ...
  8. /
  9. [root@localhost software]#

6,管控台

啥是管控台?

说白了,命令行是命令的方式进行管理,而管控台则是可视化视图的方式进行管理,当然管控台更方便了。只要是

  1. rabbitmqctl

命令行有的,管控台全都有!

进入

  1. /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/sbin

的目录

  1. [root@localhost sbin]# ls
  2. rabbitmqctl rabbitmq-defaults rabbitmq-env rabbitmq-plugins rabbitmq-server

运行命令查看已安装的插件列表

  1. [root@localhost sbin]# rabbitmq-plugins list
  2. Configured: E = explicitly enabled; e = implicitly enabled
  3. | Status: * = running on rabbit@localhost
  4. |/
  5. [ ] amqp_client 3.6.5
  6. [ ] cowboy 1.0.3
  7. [ ] cowlib 1.0.1
  8. [ ] mochiweb 2.13.1
  9. [ ] rabbitmq_amqp1_0 3.6.5
  10. [ ] rabbitmq_auth_backend_ldap 3.6.5

运行命令启动

  1. RabbitMQ

的管控台

  1. [root@localhost sbin]# rabbitmq-plugins enable rabbitmq_management

然后管控台就启动了,在浏览器的url地址栏输入虚拟机的 ip 和端口号为15672,就进入了管控台。

默认端口号就是15672。

在这里插入图片描述

账号密码默认为

  1. guest

,登录

在这里插入图片描述

7,消息队列的说明

下图是一个队列。

在这里插入图片描述

上图队列说明:

1,第一个

  1. msg

是要发送的消息;

2,

  1. exchange.direct

是交换机,

  1. direct

模式的交换机,它会规定一个

  1. Routing Key

,如果队列的

  1. Routing Key

和交换机指定的

  1. Routing Key

一样,那么会把消息发送给该队列,对不上号的肯定就不会发送了。

3,

  1. exchange.fanout

也是交换机,

  1. fanout

模式的交换机,它不会规定

  1. Routing Key

,类似广播模式,该交换机将发送消息到每一个队列中去。

4,中间绿色的当然是四个队列了。

5,

  1. exchange.topic

负责从队列中取信息,它和队列的关系和交换机和队列的关系很像,都是相互绑定,但是不同的是,交换机是发送,而它是接收。

China.#和*.news是模糊匹配原则,绑定可以模糊匹配,可以对应多个队列。

三,springboot集成rabbitmq

前提是要在管控台添加消息队列完成之后才行。

1,管控台添加消息队列

第一步,先添加一个队列

在这里插入图片描述

第二步,添加一个交换机

在这里插入图片描述

第三步,添加一个消息接收者(

  1. topic

模式的交换机)

在这里插入图片描述

第四步,绑定

  1. direct

交换机和队列

在管控台点击该交换机,进入然后绑定队列。

在这里插入图片描述

第五步,绑定

  1. topic

(接收者)交换机和队列,同上。

测试上面的消息队列是否创建成功?

点击交换机,发送消息

在这里插入图片描述

然后回到队列,查看消息

在这里插入图片描述

证明队列已经创建成功!

然后就可以落实到项目中去了!

2,springboot发送消息到队列

新建一个

  1. springboot

项目,导入坐标

  1. <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-rabbit</artifactId>
  4. </dependency>

  1. application.yml

配置文件中配置

  1. rabbitmq

的主机地址

  1. spring:rabbitmq:host: 192.168.186.128

在启动类上加上

  1. RabbitMQ

的注解

  1. @EnableRabbit
  1. @AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
  2. rabbitTemplate.convertAndSend("test.direct","test","springboot发来的消息");}

运行测试类,回到

  1. rabbitmq

的管控台,查看名为

  1. test

的队列收到消息。

在这里插入图片描述

还可以发送一个对象,前提是该对象必须序列化,不然会抛异常。可是虽然能够发送成功,但拿到的是字节流,因此发送对象可以采用

  1. json

串的方式。

对象中必须加入无参构造才能保证序列化成功,否则会序列化失败。

在这里插入图片描述

先写一个配置类,该配置类是规定使用

  1. json

格式发送。

  1. @ConfigurationpublicclassTestConfig{@BeanpublicJackson2JsonMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}
  1. User

实体类

  1. @DatapublicclassUserimplementsSerializable{privateString name;privateString address;publicUser(String name,String address){this.name = name;this.address = address;}publicUser(){}}

发送消息测试

  1. @AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
  2. rabbitTemplate.convertAndSend("test.direct","test",newUser("樊江锋","河南郑州"));}

查看管控台,已经拿到了该

  1. json

格式的对象

3,springboot从队列中接收消息

  1. @Testvoidreceive(){//指定队列名Object test = rabbitTemplate.receiveAndConvert("test");System.err.println(test);}

4,springboot监听接收到的消息

当监听到队列中收到消息时,然后可以做一些处理。下面的监听在项目启动之后会开启,前提是启动类要加

  1. @EnableRabbit

注解。

  1. //监听接收到的数据(请求体)@RabbitListener(queues ="test")publicvoidreceive(User user){System.out.println("收到消息:"+user);}//监听接收到的请求头@RabbitListener(queues ="test")publicvoidreceive2(Message message){System.out.println(message.getBody());System.out.println(message.getMessageProperties());}

测试一下监听,给

  1. test

队列发送一条消息。

  1. @AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
  2. rabbitTemplate.convertAndSend("test.direct","test",newUser("樊江锋","河南郑州"));}

然后看控制台,会打印

收到消息:User{name=‘樊江锋’, address=‘河南郑州’}

而且控制台队列显示未读消息是0,显然已经删除了。

5,amqp管理组件

  1. amqp

是用来管理组件的,上面的组件(交换机,队列,以及交换机和队列之间的绑定)都是在管控台操作的,这里使用

  1. amqp

来管理。

  1. @AutowiredAmqpAdmin amqpAdmin;voidcreate(){
  2. amqpAdmin.declareExchange(newDirectExchange("test"));//创建一个名为test的direct模式的交换机
  3. amqpAdmin.declareQueue(newQueue("test",true));//创建一个名为test的队列//第一个参数:绑定的队列名//第二个参数:绑定的类型(选择队列)//第三个参数:交换机名//第四个参数:Routing Key的值//第五个参数:需要传的参数,这里不需要
  4. amqpAdmin.declareBinding(newBinding("test",Binding.DestinationType.QUEUE,"topic_test","routingkey",null));
  5. amqpAdmin.deleteExchange("test");//删除交换机(参数:交换机名)
  6. amqpAdmin.deleteQueue("test");//删除队列(参数:队列名)}

到这里结束!


本文转载自: https://blog.csdn.net/weixin_44001965/article/details/143613765
版权归原作者 刻苦的樊同学 所有, 如有侵权,请联系我们删除。

“手把手教你SpringBoot集成消息服务中间件RabbitMQ”的评论:

还没有评论