一,消息服务中间件的概述
1,大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。
2,消息服务中有两个概念:消息代理和目的地
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
3,消息队列主要由两种形式的目的地。
(1)队列:点对点消息通信(一对一)
(2)主题:发布/订阅消息通信(一对多)
1,消息队列的应用场景:
一个网站,用户需要注册,注册后还需要发送注册邮件,还要发送注册短信,假如每一步都需要花费5秒的话,那么注册使用同步的方式,需要花费15秒,如果使用异步的方式注册,让注册邮件和注册短信同时进行,那么花费10秒。
那么问题来了,哪里用到了消息服务中间件呢?
用户注册信息之后,花费0.5秒的时间把信息写入消息中间件,然后提示用户注册成功,用户注册就只花费了5.5秒的时间。
写入消息中间件之后就不用管了,剩下的就交给消息队列自己执行了。
2,消息服务中间件的作用详述
应用解耦:
订单系统---------库存系统
订单系统每次减少一个库存,都要访问库存系统,告诉他已经卖了一单了,库存要减少一个。
订单系统----------消息队列-----------库存系统
订单系统减少一个库存时,写入消息队列,然后库存系统订阅该消息队列,得到订单系统写入的信息,然后库存系统自己减少一个库存。
流量削峰:
双十一那天会同时有超级多的用户同时抢购某一个东西,比如淘宝,那么如果特别多的用户同时访问服务器,服务器肯定是受不了的,解决办法就是把接到的请求订单信息写入消息队列,然后让服务器订阅该消息队列,自己取信息,如果请求数量超过了消息队列的上限,那么消息队列会拒收,所以减小了服务器的压力。
3,消息队列的模式
点对点式:
消息发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列。
消息只有唯一的发送者和接受者。
发布订阅式:
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。
JMS,JAVA消息服务,基于JVM消息代理的规范,ActiveMQ,HornetMQ是JMS实现。
AMQP,高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现。
二,
RabbitMQ
的安装和使用
1,下载和安装
到
RabbitMQ
官网下载安装包:https://www.rabbitmq.com/
在安装
RabbitMQ
之前,首先需要安装
ErLang
包,因为
RabbitMQ
是基于
ErLang
语言的。
下载的
RabbitMQ
安装包和
RabbitMQ
安装包的版本要一致,否则环境会搭建失败,出现很多问题。
如果查看对应一致的版本?
在
RabbitMQ
官网有和
ErLang
版本对照表。
本次安装选择在
Linux
虚拟机进行安装,在安装之前要准备三个安装包。
ErLang的 rpm包
RabbitMQ是Erlang语言编写,所以Erang环境必须要有,注:Erlang环境一定要与RabbitMQ版本匹配:https://www.rabbitmq.com/which-erlang.html
Erlang下载地址:https://www.rabbitmq.com/releases/erlang/(根据自身需求及匹配关系,下载对应rpm包)
RabbitMQ的 rpm包
RabbitMQ下载地址:https://www.rabbitmq.com/releases/rabbitmq-server/(根据自身需求及匹配关系,下载对应rpm包)
socat的 rpm包
rabbitmq安装依赖于socat,所以需要下载socat。
socat下载地址:http://repo.iotti.biz/CentOS/6/x86_64/socat-1.7.3.2-1.el6.lux.x86_64.rpm
根据自身需求下载对应系统socat依赖:(http://repo.iotti.biz/CentOS/)
下载好之后的rpm包
打开
Linux
虚拟机,进入
/usr/local/software
文件夹,使用
xftp
工具把
rpm
安装包上传到该文件夹下。
安装顺序:1:erlang 2:socat(密钥) 3:rabbitmq
首先安装
erlang
的
rpm
安装包
[fanjiangfeng@localhost software]$ rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
注意一点:要切换到
root
用户才有权限安装,否则没有权限。
其次安装
socat
的
rpm
安装包(该密钥为
RabbitMQ
服务所依赖,如果不安装的话,
rabbitMQ
会安装失败)
[root@localhost software]# rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
最后安装
RabbitMQ
的
rpm
安装包
[root@localhost software]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安装到此结束。
2,修改配置文件
安装完成后,
rabbitmq
会默认安装到
/usr/lib/rabbitmq/lib
路径下。
进入
ebin
目录,修改配置文件
rabbit.app
。
[root@localhost ebin]# pwd
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
vim rabbit.app
找到下面这个地方
修改为
RabbitMQ
安装就完成了。
3,命令行与管控台-基础操作
rabbitmqctl stop_app 关闭应用
rabbitmqctl start_app 开启应用
rabbitmqctl status 节点状态
rabbitmqctl add_user username password 添加用户
rabbitmqctl list_users 列出所有用户
rabbitmqctl delete_user usernmae 删除用户
rabbitmqctl clear_permissions -p vhostpath username 清除用户权限
rabbitmqctl add_vhost vhostpath 创建虚拟主机
rabbitmqctl list_vhosts 列出所有虚拟主机
rabbitmqctl list_permissions -p vhostpath 列出虚拟主机上所有权限
rabbitmqctl delete_vhost vhostpath 删除虚拟主机
rabbitmqctl list_queues 查看所有队列信息
rabbitmqctl -p vhostpath purge_queue blue 清除队列里的消息
4,命令行与管控台-高级操作
rabbitmqctl reset 移除所有数据,要在rabbitmqctl stop_app 之后使用
rabbitmqctl join_cluster [–ram] 组成集群命令
rabbitmqctl cluster_status 查看集群状态
5,来些基本操作
首先启动
rabbitmq
服务
[root@localhost software]# rabbitmqctl start_app
异常:Error: unable to connect to node rabbit@localhost: nodedown
如果出现上面的异常的话,解决方式如下:
systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
rabbitmqctl start_app
## 成功启动rabbitmq
## 启动neutrou-server,openstack恢复正常
systemctl start neutron-server.service
基本操作
[root@localhost software]# lsof -i:5672 查看rabbitmq是否启动(此为已启动的状态)
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
beam 6172 rabbitmq 48u IPv6 73786 0t0 TCP *:amqp (LISTEN)
[root@localhost software]# rabbitmqctl list_queues 查看消息队列(空,因为刚安装的)
Listing queues ...
[root@localhost software]# rabbitmqctl list_vhosts 查看虚拟主机(空)
Listing vhosts ...
/
[root@localhost software]#
6,管控台
啥是管控台?
说白了,命令行是命令的方式进行管理,而管控台则是可视化视图的方式进行管理,当然管控台更方便了。只要是
rabbitmqctl
命令行有的,管控台全都有!
进入
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/sbin
的目录
[root@localhost sbin]# ls
rabbitmqctl rabbitmq-defaults rabbitmq-env rabbitmq-plugins rabbitmq-server
运行命令查看已安装的插件列表
[root@localhost sbin]# rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@localhost
|/
[ ] amqp_client 3.6.5
[ ] cowboy 1.0.3
[ ] cowlib 1.0.1
[ ] mochiweb 2.13.1
[ ] rabbitmq_amqp1_0 3.6.5
[ ] rabbitmq_auth_backend_ldap 3.6.5
运行命令启动
RabbitMQ
的管控台
[root@localhost sbin]# rabbitmq-plugins enable rabbitmq_management
然后管控台就启动了,在浏览器的url地址栏输入虚拟机的 ip 和端口号为15672,就进入了管控台。
默认端口号就是15672。
账号密码默认为
guest
,登录
7,消息队列的说明
下图是一个队列。
上图队列说明:
1,第一个
msg
是要发送的消息;
2,
exchange.direct
是交换机,
direct
模式的交换机,它会规定一个
Routing Key
,如果队列的
Routing Key
和交换机指定的
Routing Key
一样,那么会把消息发送给该队列,对不上号的肯定就不会发送了。
3,
exchange.fanout
也是交换机,
fanout
模式的交换机,它不会规定
Routing Key
,类似广播模式,该交换机将发送消息到每一个队列中去。
4,中间绿色的当然是四个队列了。
5,
exchange.topic
负责从队列中取信息,它和队列的关系和交换机和队列的关系很像,都是相互绑定,但是不同的是,交换机是发送,而它是接收。
China.#和*.news是模糊匹配原则,绑定可以模糊匹配,可以对应多个队列。
三,springboot集成rabbitmq
前提是要在管控台添加消息队列完成之后才行。
1,管控台添加消息队列
第一步,先添加一个队列
第二步,添加一个交换机
第三步,添加一个消息接收者(
topic
模式的交换机)
第四步,绑定
direct
交换机和队列
在管控台点击该交换机,进入然后绑定队列。
第五步,绑定
topic
(接收者)交换机和队列,同上。
测试上面的消息队列是否创建成功?
点击交换机,发送消息
然后回到队列,查看消息
证明队列已经创建成功!
然后就可以落实到项目中去了!
2,springboot发送消息到队列
新建一个
springboot
项目,导入坐标
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
在
application.yml
配置文件中配置
rabbitmq
的主机地址
spring:rabbitmq:host: 192.168.186.128
在启动类上加上
RabbitMQ
的注解
@EnableRabbit
@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
rabbitTemplate.convertAndSend("test.direct","test","springboot发来的消息");}
运行测试类,回到
rabbitmq
的管控台,查看名为
test
的队列收到消息。
还可以发送一个对象,前提是该对象必须序列化,不然会抛异常。可是虽然能够发送成功,但拿到的是字节流,因此发送对象可以采用
json
串的方式。
对象中必须加入无参构造才能保证序列化成功,否则会序列化失败。
先写一个配置类,该配置类是规定使用
json
格式发送。
@ConfigurationpublicclassTestConfig{@BeanpublicJackson2JsonMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}}
User
实体类
@DatapublicclassUserimplementsSerializable{privateString name;privateString address;publicUser(String name,String address){this.name = name;this.address = address;}publicUser(){}}
发送消息测试
@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
rabbitTemplate.convertAndSend("test.direct","test",newUser("樊江锋","河南郑州"));}
查看管控台,已经拿到了该
json
格式的对象
3,springboot从队列中接收消息
@Testvoidreceive(){//指定队列名Object test = rabbitTemplate.receiveAndConvert("test");System.err.println(test);}
4,springboot监听接收到的消息
当监听到队列中收到消息时,然后可以做一些处理。下面的监听在项目启动之后会开启,前提是启动类要加
@EnableRabbit
注解。
//监听接收到的数据(请求体)@RabbitListener(queues ="test")publicvoidreceive(User user){System.out.println("收到消息:"+user);}//监听接收到的请求头@RabbitListener(queues ="test")publicvoidreceive2(Message message){System.out.println(message.getBody());System.out.println(message.getMessageProperties());}
测试一下监听,给
test
队列发送一条消息。
@AutowiredRabbitTemplate rabbitTemplate;@TestvoidcontextLoads(){//第一个参数:交换机名字 第二个参数:Routing Key的值 第三个参数:传递的消息对象
rabbitTemplate.convertAndSend("test.direct","test",newUser("樊江锋","河南郑州"));}
然后看控制台,会打印
收到消息:User{name=‘樊江锋’, address=‘河南郑州’}
而且控制台队列显示未读消息是0,显然已经删除了。
5,amqp管理组件
amqp
是用来管理组件的,上面的组件(交换机,队列,以及交换机和队列之间的绑定)都是在管控台操作的,这里使用
amqp
来管理。
@AutowiredAmqpAdmin amqpAdmin;voidcreate(){
amqpAdmin.declareExchange(newDirectExchange("test"));//创建一个名为test的direct模式的交换机
amqpAdmin.declareQueue(newQueue("test",true));//创建一个名为test的队列//第一个参数:绑定的队列名//第二个参数:绑定的类型(选择队列)//第三个参数:交换机名//第四个参数:Routing Key的值//第五个参数:需要传的参数,这里不需要
amqpAdmin.declareBinding(newBinding("test",Binding.DestinationType.QUEUE,"topic_test","routingkey",null));
amqpAdmin.deleteExchange("test");//删除交换机(参数:交换机名)
amqpAdmin.deleteQueue("test");//删除队列(参数:队列名)}
到这里结束!
版权归原作者 刻苦的樊同学 所有, 如有侵权,请联系我们删除。