文章目录
一、简介
1、消息队列简介
MQ全称为Message Queue,消息队列是应⽤程序和应⽤程序之间的通信⽅法。
RabbitMQ是⼀个Erlang开发的AMQP(Advanced Message Queuing Protocol )的开源实现。
2、MQ产品简介
ActiveMQ
:基于JMS实现, ⽐较均衡, 不是最快的, 也不是最稳定的。ZeroMQ
:基于C语⾔开发, ⽬前最好的队列系统。RabbitMQ
:基于AMQP协议,erlang语⾔开发,稳定性好, 数据基本上不会丢失。RocketMQ
:基于JMS,阿⾥巴巴产品, ⽬前已经捐献给apahce, 还在孵化器中孵化。Kafka
:类似MQ的产品;分布式消息系统,⾼吞吐量, ⽬前最快的消息服务器, 不保证数据完整性。
3、AMQP 和 JMS
MQ是消息通信的模型;实现MQ的⼤致有两种主流⽅式:AMQP、JMS。
AMQP
AMQP是⼀种协议,更准确的说是⼀种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进⾏限定,⽽是直接定义⽹络交换的数据格式。
JMS
JMS即Java消息服务(JavaMessage Service)应⽤程序接⼝,是⼀个Java平台中关于⾯向消息中间件(MOM)的API,⽤于在两个应⽤程序之间,或分布式系统中发送消息,进⾏异步通信。
AMQP 与JMS 区别
JMS是定义了统⼀的接⼝,来对消息操作进⾏统⼀;AMQP是通过规定协议来统⼀数据交互的格式JMS限定了必须
使⽤Java语⾔;AMQP只是协议,不规定实现⽅式,因此是跨语⾔的。JMS规定了两种消息模式;⽽AMQP的消息
模式更加丰富.
JMSAMQP定义Java apiWire-protocol跨语⾔否是跨平台否是
二、RabbitMQ
1、简介
RabbitMQ是由erlang语⾔开发,基于AMQP(Advanced Message Queue ⾼级消息队列协议)协议实现的消息队列,它是⼀种应⽤程序之间的通信⽅法,消息队列在分布式系统开发中应⽤⾮常⼴泛。
RabbitMQ官⽅地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:Hello Word简单模式,work⼯作模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式(通配符模式),RPC远程调⽤模式(远程调⽤,不太算MQ;不作介绍)
官⽹对应模式介绍:https://www.rabbitmq.com/getstarted.html
2、相关定义
- Broker: 简单来说就是消息队列服务器实体
- Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue: 消息队列载体,每个消息都会被投⼊到⼀个或多个队列
- Binding: 绑定,它的作⽤就是把exchange和queue按照路由规则绑定起来
- Routing Key: 路由关键字,exchange根据这个关键字进⾏消息投递
- VHost: 虚拟主机,⼀个broker⾥可以开设多个vhost,⽤作不同⽤户的权限分离。
- Producer: 消息⽣产者,就是投递消息的程序
- Consumer: 消息消费者,就是接受消息的程序
- Channel: 消息通道,在客户端的每个连接⾥,可建⽴多个channel,每个channel代表⼀个会话任务
由Exchange、Queue、RoutingKey三个才能决定⼀个从Exchange到Queue的唯⼀的线路。
3、工作模式说明(6种)
- 简单模式 HelloWorld:⼀个⽣产者、⼀个消费者,不需要设置交换机(使⽤默认的交换机)
- ⼯作队列模式 Work Queue:⼀个⽣产者、多个消费者(竞争关系),不需要设置交换机(使⽤默认的交换机)
- 发布订阅模式 Publish/subscribe:需要设置类型为fanout的交换机,并且交换机和队列进⾏绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
- 路由模式 Routing:需要设置类型为direct的交换机,交换机和队列进⾏绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
- 通配符模式 Topic: 需要设置类型为topic的交换机,交换机和队列进⾏绑定,并且指定通配符⽅式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。
简单模式、工作队列模式:
生产消息和消费消息都是针对某一队列,区别是简单模式1个消费者,工作队列模式,多个消费者,单个消息只允许消费一次。
发布订阅模式、路由模式、通配符模式(Topic):
都是通过exchange进行转发,差异点主要在于exchange转发器,其它都一致。
发布订阅模式:fanout交换机,直接绑定,无规则。
路由模式:direct的交换机,指定key,进行发送。
通配符模式(Topic):topic的交换机,可以匹配多个key。
4、⻆⾊说明:
- 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对⽤户,策略(policy)进⾏操作。
- 监控者(monitoring):可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使⽤情况,磁盘使⽤情况等) 。
- 策略制定者(policymaker):可登陆管理控制台, 同时可以对policy进⾏管理。但⽆法查看节点的相关信息(上图红框标识的部分)。
- 普通管理者(management):仅可登陆管理控制台,⽆法看到节点信息,也⽆法对策略进⾏管理。
- 其他:⽆法登陆管理控制台,通常就是普通的⽣产者和消费者。
5、端口号
15672
:(if management plugin is enabled.管理界⾯ )。15671
:management监听端⼝。5672, 5671
:(AMQP 0-9-1 without and with TLS 消息队列协议是⼀个消息协议)。4369
:(epmd) epmd 代表 Erlang 端⼝映射守护进程。25672
: (Erlang distribution)。
6、Exchange三种模式
Exchange有常⻅以下3种类型:
Fanout
⼴播:将消息交给所有绑定到交换机的队列, 不处理路由键。只需要简单的将队列绑定到交换机上。fanout
类型交换机转发消息是最快的。Direct
:定向 把消息交给符合指定routing key 的队列. 处理路由键。需要将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配。如果⼀个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为 “dog” 的消息才被转发,不会转发 dog.puppy,也不会转发 dog.guard,只会转发dog。路由模式:direct
类型的交换机。Topic
:主题(通配符) 把消息交给符合routing pattern(路由模式)的队列. 将路由键和某模式进⾏匹配。此时队列需要绑定要⼀个模式上。符号 “#” 匹配⼀个或多个词,符号“”匹配不多不少⼀个词。因此“audit.#” 能够匹配到“audit.irs.corporate”,但是*“audit.*” 只会匹配到 “audit.irs”。主题模式(通配符模式):topic
类型的交换机。
Exchange
(交换机)只负责转发消息,不具备存储消息的能⼒,因此如果没有任何队列与
Exchange
绑定,或者没有符合路由规则的队列,那么消息会丢失。
三、安装RabbitMQ
1、下载文件
文件地址
https://download.csdn.net/download/weixin_44624117/85722977
参考地址
https://www.cnblogs.com/xuwenjin/p/14984910.html
2、安装
1 安装
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
2 复制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
3 修改配置文件
修改配置文件:rabbitmq.config。
把上面这一行放开(注意最后的逗号要去掉)。表示开启 guest 账户
4 启动 rabbitmq 的插件管理
rabbitmq-plugins enable rabbitmq_management
5 启动rabbitmq服务
systemctl start rabbitmq-server
6 访问页面
http://localhost:15672
账号密码:guest/guest
7 其它命令
服务启动/停止/重启/查询状态
systemctl start rabbitmq-server #启动
systemctl restart rabbitmq-server #重启
systemctl stop rabbitmq-server #停止
systemctl status rabbitmq-server #查看状态
rabbitmq 管理命令:rabbitmqctl
rabbitmqctl list_users #查看用户
rabbitmqctl list_queues #查看队列
rabbitmqctl status #查看borker状态
rabbitmqctl purget_queue 队列名称 #删除指定队列中数据
rabbitmqctl delete_queue 队列名称 #删除指定队列
rabbitmqctl help#帮助命令
插件管理命令:rabbitmq-plugins
rabbitmq-plugins list #查看插件
rabbitmq-plugins enable#启动插件
rabbitmq-plugins disable #停止插件
rabbitmq-plugins help#帮助命令
四、页面配置
1、添加用户
RabbitMQ在安装好后,可以访问http://localhost:15672;其⾃带了guest/guest的⽤户名和密码;如果需要创建⾃定义⽤户;那么也可以登录管理界⾯后,如下操作:
2、Virtual Hosts配置(db)
像mysql拥有数据库的概念并且可以指定⽤户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于⼀个相对独⽴的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。相当于mysql的db。Virtual Name⼀般以/开头。
1. 创建Virtual Hosts
2.设置Virtual Hosts权限
3、创建队列
五、快速开始
1、页面创建队列
创建simple_queue队列⽤于演示Hello World简单模式
2、
pom.xml
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
3、
application.properties
# IP
spring.rabbitmq.host=8.131.239.157
# 端口号
spring.rabbitmq.port=5672
# v-host(DB)
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.username=lydms
spring.rabbitmq.password=lydms
4、消息生产者
importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassRabbitTestTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleSend(){
rabbitTemplate.convertAndSend("simple_queue","你好, ⼩兔⼦!====");}}
5、消息消费者
importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RabbitListener(queues ="simple_queue")publicclassSimpleListener{@RabbitHandlerpublicvoidtestListener(String message){System.out.println("======接收到的消息为:======"+ message);}}
六、工作模式
1、
Hello World
简单模式
1.1 简介
P:⽣产者: 也就是要发送消息的程序
C:消费者:消息的接受者,会⼀直等待消息到来。
queue:消息队列,图中红⾊部分。类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从其中取出消息。
1.2 创建队列
1.3 生产者代码
importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassRabbitTestTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleSend(){
rabbitTemplate.convertAndSend("simple_queue","你好, ⼩兔⼦!====");}}
1.4 消费者代码
importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RabbitListener(queues ="simple_queue")publicclassSimpleListener{@RabbitHandlerpublicvoidtestListener(String message){System.out.println("======接收到的消息为:======"+ message);}}
2、
Work queues
⼯作队列模式
2.1 简介
Work Queues与⼊⻔程序的简单模式相⽐,多了⼀个或多个消费端,多个消费端共同消费同⼀个队列中的消息。
应⽤场景:对于任务过重或任务较多情况使⽤⼯作队列可以提⾼任务处理的速度。
在⼀个队列中如果有多个消费者,那么消费者之间对于同⼀个消息的关系是竞争的关系。
创建队列和代码都是一样的,只是多了一个消费者而已。
2.2 创建队列
2.3 生产者代码
importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassRabbitTestTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleSend(){for(int i =0; i <100; i++){
rabbitTemplate.convertAndSend("simple_queue","你好, ⼩兔⼦!===="+ i);}}}
2.4 消费者代码
importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RabbitListener(queues ="simple_queue")publicclassSimpleListener{@RabbitHandlerpublicvoidtestListener(String message){System.out.println("======接收到的消息为:======"+ message);}}
3、
Publish/Subscribe
发布与订阅模式
1.1 简介
发布订阅模式:
- 每个消费者监听⾃⼰的队列。
- ⽣产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
2.2 创建队列
1)创建队列
2)创建交换器
3)进行绑定
2.3 生产者代码
@TestpublicvoidtestSimpleSend(){for(int i =0; i <100; i++){
rabbitTemplate.convertAndSend("fanout_exchange","","你好, ⼩兔⼦!===="+ i);}}
2.4 消费者代码
和简单模式中一致
importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RabbitListener(queues ="simple_queue")publicclassSimpleListener{@RabbitHandlerpublicvoidtestListener(String message){System.out.println("======接收到的消息为:======"+ message);}}
4、
Routing
路由模式
4.1 简介
路由模式特点:队列与交换机的绑定,不能是任意绑定了,⽽是要指定⼀个RoutingKey(路由key)消息的发送⽅在向 Exchange发送消息时,也必须指定消息的RoutingKey。Exchange不再把消息交给每⼀个绑定的队列,⽽是根据消息的Routing Key进⾏判断,只有队列的Routingkey与消息的Routing key完全⼀致,才会接收到消息2.2 创建队列。
图解:
P
:⽣产者:向Exchange发送消息,发送消息时,会指定⼀个routing key。X
:Exchange(交换机):接收⽣产者的消息,然后把消息递交给与routing key完全匹配的队列。C1
:消费者:其所在队列指定了需要routing key 为 error 的消息。C2
:消费者:其所在队列指定了需要routing key 为 info、error、warning 的消息。
4.2 生产者代码
新建队列
创建两个队列分别叫做
direct_queue_insert
和
direct_queue_update
⽤户演示。
**创建交换器
direct_exchange
, 类型为
direct
, ⽤于演示路由模式。**
设置绑定:将创建的交换器
direct_exchange
和
direct_queue_insert
,
direct_queue_update
绑定在⼀起, 路由键
Routing Key
分别为
insertKey
和
updateKey
。
4.3 生产者代码
@TestpublicvoidtestDirectExchange(){for(int i =0; i <100; i++){if(i%2==0){
rabbitTemplate.convertAndSend("direct_exchange","insertKey","你好, ⼩兔⼦!==inserKey=="+ i);}else{
rabbitTemplate.convertAndSend("direct_exchange","updateKey","你好, ⼩兔⼦!==updateKey=="+ i);}}}
页面队列信息:
4.4 消费者代码
importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RabbitListener(queues ="direct_queue_insert")publicclassSimpleListener{@RabbitHandlerpublicvoidtestListener(String message){System.out.println("======接收到的消息为:======"+ message);}}
收到消息:
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==52
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==54
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==56
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==58
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==60
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==62
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==64
5、
Topics
通配符模式(主题模式)
5.1 简介
Topic
类型与
Direct
相⽐,都是可以根据
RoutingKey
把消息路由到不同的队列。只不过
Topic
类型
Exchange
可以让队列在绑定
Routing key
的时候使⽤通配符!
Routingkey
: ⼀般都是有⼀个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配⼀个或多个词。*
:匹配不多不少恰好1个词。
举例:
lydms.#
: 能够匹配
lydms.insert.abc
或者
lydms.insert
。
lydms.
:只能匹配
lydms.insert
。
图解:
- 红⾊Queue:绑定的是
usa.#
,因此凡是以usa.
开头的routing key
都会被匹配到。 - ⻩⾊Queue:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配。
5.2 创建队列
**创建队列
topic_queue1
和
topic_queue1
**
**创建交换器
topic_exchange
,
type
类型为
topic
。**
设置绑定:
topic_queue1
绑定的
Routing Key
路由键为
item.*
。
topic_queue2
绑定的
Routing Key
路由键为
item.#
。
5.3 生产者代码
@TestpublicvoidtestTopicSend(){
rabbitTemplate.convertAndSend("topic_exchange","lydms.select","你好, ⼩兔⼦===lydms.select");
rabbitTemplate.convertAndSend("topic_exchange","lydms.select.abc","你不太好, ⼩兔⼦===lydms.select.abc");}
5.4 消费者代码
- 只是接受某个队列消息
importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RabbitListener(queues ="topic_queue1")publicclassSimpleListener{@RabbitHandlerpublicvoidtestListener(String message){System.out.println("======接收到的消息为:======"+ message);}}
版权归原作者 ha_lydms 所有, 如有侵权,请联系我们删除。