【RabbitMQ】RabbitMQ配置与交换机学习
文章目录
简介
RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP)。RabbitMQ支持多种消息传递协议,具有高可靠性、高可用性和高性能等特点。它允许应用程序通过消息队列进行异步通信,从而实现解耦和负载均衡。RabbitMQ的核心概念包括交换机(Exchange)、队列(Queue)和绑定(Binding),它们共同构成了消息的路由和传递机制。
RabbitMQ的架构如图:
其中包含几个概念:
- **
publisher
**:生产者,也就是发送消息的一方 - **
consumer
**:消费者,也就是消费消息的一方 - **
queue
**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理 - **
exchange
**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。 - **
virtual host
**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
安装和部署
这里以Centos7为例:
1. 安装RabbitMQ
docker run \-eRABBITMQ_DEFAULT_USER=shijun \-eRABBITMQ_DEFAULT_PASS=123321\-v mq-plugins:/plugins \--name mq \--hostname mq \-p15672:15672 \-p5672:5672 \--network hm-net\-d\
rabbitmq:3.8-management
-e RABBITMQ_DEFAULT_USER=shijun
-e
参数用于设置环境变量。这行代码用来设置RabbitMQ的默认用户名为shijun
。- **
-e RABBITMQ_DEFAULT_PASS=123321
**这行代码用来设置默认密码为123321
。- **
-p 15672:15672
**这行代码用来将宿主机的15672端口映射到容器的15672端口,15672端口是RabbitMQ管理控制台的默认端口。- **
-p 5672:5672
**这行代码用来将宿主机的5672端口映射到容器的5672端口,5672端口是RabbitMQ的默认通信端口。- **
--network hm-net
**这行代码将容器连接到名为hm-net
的网络。-d
-d
参数表示以后台模式运行容器。- **
rabbitmq:3.8-management
**这里是我们要运行的rabbitmq
的Docker镜像,这里选择的是RabbitMQ 3.8版本,版本需要根据自己的SpringCloud版本来选择。
安装完成后访问:http://虚拟机IP地址:15672
输入刚才的账号密码:shijun 123321,就能进入控制台界面。
2.创建virtual-host
由于RabbitMQ 每秒并发能力为几万,一般项目都不会达到这个规模,因此我们可以让多个项目使用同一个RabbitMQ 。要实现项目直接的隔离需要创建virtual-host,每个项目对应一个virtual-host。
按顺序点击,填入“Name”和“Descrption”,然后点击“Add virtual host”按钮:
然后在右上角切换到创建的virtual-host:
3. 添加依赖
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
4.修改配置文件
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.56.101 # 你的虚拟机IPport:5672# 端口virtual-host: /mq-demo # 虚拟主机username: shijun # 用户名password:123321# 密码
WorkQueues模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
在控制台创建一个work.queue队列:
1.编写消息发送测试类
importstaticorg.junit.jupiter.api.Assertions.*;@SpringBootTestclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestWorkQueue()throwsInterruptedException{// 队列名称String queueName ="work.queue";// 消息String message ="hello, message_";for(int i =0; i <50; i++){// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}}
RabbitTemplate
:Spring AMQP提供的模板类,用于发送和接收消息。rabbitTemplate.convertAndSend(queueName, message + i);
:使用RabbitTemplate
发送消息到指定的队列。
2.编写消息接收(监听)类
@ComponentpublicclassSpringRabbitListener{/**
* 监听名为"work.queue"的RabbitMQ队列,接收并处理来自队列的消息。
* 通过延迟执行模拟消息处理时间
*
* @param msg 从队列中接收到的消息内容,以字符串形式提供
* @throws InterruptedException 如果线程在睡眠期间被中断,则抛出此异常
*/@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(String msg)throwsInterruptedException{// 输出接收到消息的时间,以便跟踪消息处理的时间点System.out.println("消费者1接收到消息:【"+ msg +"】"+LocalTime.now());// 模拟消息处理时间,让线程睡眠20毫秒Thread.sleep(20);}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(String msg)throwsInterruptedException{System.err.println("消费者2........接收到消息:【"+ msg +"】"+LocalTime.now());Thread.sleep(200);}}
@RabbitListener
:此注解用来设置该方法应该作为RabbitMQ消息队列的监听器。这意味着当队列中有消息发布时,Spring框架会自动调用此方法来处理这些消息。queues = "work.queue"
:指定要监听的RabbitMQ队列的名称,在这个例子中是work.queue。- 注意到这两消费者,都设置了
Thead.sleep
,模拟任务耗时: - 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息- 消费者2 sleep了200毫秒,相当于每秒处理5个消息
运行后查看结果:
观察可以发现:
- 一个消息只会被一个消费者处理
- 当有很多消息时,会平均分配(一个消费者负责奇数的消息,一个消费者负责偶数的消息)
但问题是消费者之间的消费能力可能不一样,有的消费能力强,有消费的弱,会出现部分消费者一直空闲而其他消费者一直繁忙的状况,没有充分利用每一个消费者的能力
3. 实现能者多劳
修改配置文件:
spring:rabbitmq:listener:simple:prefetch:1# 每次只能获取一条消息,处理完成才能获取下一个消息
再次运行,查看结果:
观察可以发现:
消费者2处理了大概5条消息,而消费者1处理了40多条消息,成功实现"能者多劳"。
交换机
在之前的RabbitMQ的架构图中我们可以看到,里面还有一项Exchange没有提到,那就是RabbitMQ中的交换机。
交换机是RabbitMQ中用于接收生产者发送的消息并将这些消息路由到一个或多个队列的组件。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机有不同的类型,常见的有以下几种:
- Fanout Exchange(扇出交换机):将消息广播到所有绑定的队列,不考虑路由键。
- Direct Exchange(直连交换机):根据消息的路由键精确匹配队列。
- Topic Exchange(主题交换机):根据路由键的模式匹配队列。
- Headers Exchange(头交换机):根据消息的头部信息匹配队列。
Fanout交换机
扇出交换机将消息广播到所有绑定的队列,而不考虑路由键。这种交换机非常适合需要将消息广播到多个消费者的场景。
假如说当订单服务有了一笔新订单之后就要去通知短信服务、商品服务等等,有了交换机之后就只需要将消息发给交换机,然后为每一个微服务创建一个队列并绑定,之后当有消息时,交换机就会把消息发送到所有队列,就能实现一个消息被多个消费者处理了。
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
- 创建Fanout交换机
- 创建两个队列
fanout.queue1
、fanout.queue2
: - 点击刚刚创建的交换机,进入:
- 将刚才创建的两个队列绑定到交换机,
1.消息发送
在SpringAmqpTest类中添加测试方法:
@TestpublicvoidtestFanoutExchange(){// 交换机名称String exchangeName ="demo.fanout";// 消息String message ="hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,"", message);}
abbitTemplate.convertAndSend(exchangeName, "", message);
中的第二个参数用来指定路由键。对于Fanout交换机,路由键没有实际意义,因此可以传递一个空字符串。
2.消息监听
在
SpringRabbitListener
中添加两个方法:
@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【"+ msg +"】");}
运行代码,查看结果:
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
直连交换机根据消息的路由键精确匹配队列。只有当消息的路由键与绑定的路由键完全匹配时,消息才会被路由到相应的队列。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
- 创建
direct.queue1
和direct.queue2
两个队列,之后创建一个direct类型的交换机: - 绑定队列到交换机,最终结果如图所示:
1.消息发送
在
SpringAmqpTest
类中添加测试方法:
@TestpublicvoidtestSendDirectExchange1(){// 交换机名称String exchangeName ="demo.direct";// 消息String message ="红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"red", message);}@TestpublicvoidtestSendDirectExchange2(){// 交换机名称String exchangeName ="demo.direct";// 消息String message ="最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue", message);}
2.消息接收
在
SpringRabbitListener
中添加方法:
@RabbitListener(queues ="direct.queue1")publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="direct.queue2")publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}
运行测试类中的
testSendDirectExchange1
,查看结果:
运行测试类中的
testSendDirectExchange2
,查看结果:
观察可以发现:
- 当发送的消息的Routing key为red时,两个消息队列都能收到
- 当发送的消息的Routing key为red时,只有消息队列1才能收到
Topic交换机
Topic交换机(Topic Exchange)是RabbitMQ中一种功能强大的交换机类型,它通过路由键的模式匹配将消息路由到一个或多个队列。Topic交换机允许使用通配符来匹配路由键,从而实现灵活的消息路由。
通配符:在绑定键中,可以使用两个特殊字符来实现模式匹配:-
*
:匹配一个单词。-#
:匹配零个或多个单词。
如图所示,假如此时publisher发送的消息使用的RoutingKey
共有四种:
china.news
代表有中国的新闻消息;china.weather
代表中国的天气消息;japan.news
则代表日本新闻japan.weather
代表日本的天气消息;
解释:
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括: -china.news
-china.weather
topic.queue2
:绑定的是#.news
,凡是以.news
结尾的routing key
都会被匹配。包括: -china.news
-japan.news
更多范例:
假设我们有以下绑定键:
*.orange.*
*.*.rabbit
lazy.#
我们可以通过以下路由键进行消息路由:
- 路由键
quick.orange.rabbit
将匹配第一个和第二个绑定键。- 路由键
lazy.orange.elephant
将匹配第一个和第三个绑定键。- 路由键
lazy.brown.fox
将匹配第三个绑定键。- 路由键
lazy
将匹配第三个绑定键。
按照之前的流程创建Topic交换机和队列并进行绑定,最终结果如下:
1.消息发送
在
SpringAmqpTest
类中添加测试方法:
@TestpublicvoidtestSendTopicExchange1(){// 交换机名称String exchangeName ="demo.topic";// 消息String message ="喜报!孙悟空大战哥斯拉,胜!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}@TestpublicvoidtestSendTopicExchange1(){// 交换机名称String exchangeName ="demo.topic";// 消息String message ="喜报!孙悟空大战哥斯拉,胜!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.weather", message);}
2.消息接收
在
SpringRabbitListener
中添加方法:
@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}
运行测试类中的
testSendTopicExchange1
后观察结果:
观察发现两个消息队列都收到了,说明
china.#
和
#.news
都匹配成功了。
运行测试类中的
testSendTopicExchange2
后观察结果:
观察可以发现只有消息队列1匹配成功,说明
china.#
匹配成功。
声明队列和交换机
之前我们创建交换机是通过控制台创建的,然而实际开发中是不推荐使用这种方式,因为可能会出现一些问题,更推荐让程序员通过代码来判断交换机和队列是否存在,如果没有再进行创建。
在实际开发中,RabbitMQ的配置类一般放到消费者包下,生产者一般会关心消息是否发送成功。
声明队列
队列是RabbitMQ中用于存储消息的组件。Spring AMQP通过
Queue
类来声明队列。队列有以下几个重要属性:
- name:队列名称。
- durable:是否持久化。持久化队列在RabbitMQ重启后仍然存在,信息持久到磁盘。
- exclusive:是否排他。排他队列只能被创建它的连接使用,并且在连接断开时自动删除。
- autoDelete:是否自动删除。当最后一个消费者断开连接后,自动删除队列。
比如:
importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQQueueConfig{@BeanpublicQueuemyQueue(){returnnewQueue("simple.queue");}}
声明了一个名为simple.queue的队列,默认为持久化、非排他、非自动删除。
声明交换机
使用
ExchangeBuilder
声明交换机,
ExchangeBuilder
类提供了多种方法来配置交换机的属性。以下是一些常用的方法:
- **durable()**:声明持久化交换机。
- **autoDelete()**:声明自动删除交换机。
- **withArgument()**:添加交换机的自定义参数。
importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.ExchangeBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.amqp.core.HeadersExchange;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQExchangeConfig{@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("direct.exchange").durable(true).build();}@BeanpublicFanoutExchangefanoutExchange(){returnExchangeBuilder.fanoutExchange("fanout.exchange").durable(true).build();}@BeanpublicTopicExchangetopicExchange(){returnExchangeBuilder.topicExchange("topic.exchange").durable(true).build();}}
绑定队列和交换机
在RabbitMQ中,绑定关系(Binding)是交换机和队列之间的连接。绑定关系告诉交换机如何将消息路由到队列。在Spring AMQP中,我们可以使用
BindingBuilder
类来声明和配置绑定关系。
BindingBuilder
类提供了一些静态方法来创建绑定关系。常用的方法包括:
- **bind()**:绑定队列到交换机。
- **to()**:指定交换机。
- **with()**:指定路由键(用于直连交换机和主题交换机)。
- **where()**:指定头部信息(用于头交换机)。
1.fanout示例
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
* 声明交换机
* @return Fanout类型交换机
*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("demo.fanout");}/**
* 第1个队列
*/@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/**
* 第2个队列
*/@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
在控制台删除
demo.fanout
交换机和
fanout.queue2
、
fanout.queue2
这两个队列,再次运行代码会发现删除的又重新出现了:
2. direct示例
direct交换机
importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{/**
* 声明交换机
* @return Direct类型交换机
*/@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("direct.exchange").build();}/**
* 第1个队列
*/@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1WithRed(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("red");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1WithBlue(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/**
* 第2个队列
*/@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2WithRed(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("red");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2WithYellow(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
3.基于注解的方式声明队列和交换机
基于
@Bean
的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
修改
SpringRabbitListener
类:
@ComponentpublicclassSpringRabbitListener{// .......@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue1"),
exchange =@Exchange(name ="demo.direct", type =ExchangeTypes.DIRECT),
key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue2"),
exchange =@Exchange(name ="demo.direct", type =ExchangeTypes.DIRECT),
key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue1"),
exchange =@Exchange(name ="demo.topic", type =ExchangeTypes.TOPIC),
key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue2"),
exchange =@Exchange(name ="demo.topic", type =ExchangeTypes.TOPIC),
key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}// .......}
@QueueBinding
注解包含以下几个主要部分:
- value:定义队列,使用
@Queue
注解。- exchange:定义交换机,使用
@Exchange
注解。- key:定义路由键,使用字符串数组。
删除交换机和队列后再次运行会发现又重新出现:
消息转换器
Spring的消息发送代码接收的消息体是一个Object:
在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
因此我们可以配置JSON转换器来解决这个问题。
- 引入Jackson依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
- 配置消息转换器在
publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
测试:
在
FanoutConfig
类中声明队列:
@BeanpublicQueueobjectQueue(){returnnewQueue("object.queue");}
在
SpringAmqpTest
类中添加:
@TestpublicvoidtestSendMap()throwsInterruptedException{// 准备消息Map<String,Object> msg =newHashMap<>();
msg.put("name","柳岩");
msg.put("age",21);// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);}
在
SpringRabbitListener
类中添加:
@RabbitListener(queues ="object.queue")publicvoidlistenSimpleQueueMessage(Map<String,Object> msg)throwsInterruptedException{System.out.println("消费者接收到object.queue消息:【"+ msg +"】");}
运行测试类查看结果:
总结
本文较为详细的记录了RabbitMQ的安装配置以及交换机学习,希望本文对大家学习RabbitMQ有所帮助。
版权归原作者 詩筠 所有, 如有侵权,请联系我们删除。