RabbitMq
由于
RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与
RabbitMQ
交互。并且
RabbitMQ
官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
概念
**publisher**
:生产者,也就是发送消息的一方**consumer**
:消费者,也就是消费消息的一方**queue**
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理**exchange**
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。**virtual host**
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
交换机
我们打开Exchanges选项卡,可以看到已经存在很多交换机:
我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息:
这里是由控制台模拟了生产者发送的消息。由于没有消费者存在,最终消息丢失了,这样说明交换机没有存储消息的能力。
队列
我们打开
Queues
选项卡,新建一个队列:
命名为
hello.queue1
:
再以相同的方式,创建一个队列,密码为
hello.queue2
,最终队列列表如下:
此时,我们再次向
amq.fanout
交换机发送一条消息。会发现消息依然没有到达队列!!
发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定。
绑定关系
点击
Exchanges
选项卡,点击
amq.fanout
交换机,进入交换机详情页,然后点击
Bindings
菜单,在表单中填写要绑定的队列名称:
相同的方式,将hello.queue2也绑定到改交换机。
最终,绑定结果如下:
发送消息
再次回到exchange页面,找到刚刚绑定的
amq.fanout
,点击进入详情页,再次发送一条消息:
回到
Queues
页面,可以发现
hello.queue
中已经有一条消息了:
点击队列名称,进入详情页,查看队列详情,这次我们点击get message:
可以看到消息到达队列了:
这个时候如果有消费者监听了MQ的
hello.queue1
或
hello.queue2
队列,自然就能接收到消息了。
用户管理
点击
Admin
选项卡,首先会看到RabbitMQ控制台的用户管理界面:
这里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的
itheima
这个用户。仔细观察用户表格中的字段,如下:
Name
:admin
,也就是用户名Tags
:administrator
,说明itheima
用户是超级管理员,拥有所有权限Can access virtual host
:/
,可以访问的virtual host
,这里的/
是默认的virtual host
对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用
virtual host
的隔离特性,将不同项目隔离。一般会做两件事情:
- 给每个项目创建独立的运维账号,将管理权限分离。
- 给每个项目创建不同的
virtual host
,将每个项目的数据隔离。
交换机
。而一旦引入交换机,消息发送的模式会有很大变化:
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
我们的计划是这样的:
- 创建一个名为
hmall.fanout
的交换机,类型是Fanout
- 创建两个队列
fanout.queue1
和fanout.queue2
,绑定到交换机hmall.fanout
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@TestpublicvoidtestFanoutExchange(){// 交换机名称String exchangeName ="hmall.fanout";// 消息String message ="hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,"", message);}
消息接收
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【"+ msg +"】");}
Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
案例需求如图:
- 声明一个名为
hmall.direct
的交换机 - 声明队列
direct.queue1
,绑定hmall.direct
,bindingKey
为blud
和red
- 声明队列
direct.queue2
,绑定hmall.direct
,bindingKey
为yellow
和red
- 在
consumer
服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2 - 在publisher中编写测试方法,向
hmall.direct
发送消息
声明队列和交换机
首先在控制台声明两个队列
direct.queue1
和
direct.queue2
,然后声明一个direct类型的交换机,命名为
hmall.direct
:
然后使用
red
和
blue
作为key,绑定
direct.queue1
到
hmall.direct
同理,使用
red
和
yellow
作为key,绑定
direct.queue2
到
hmall.direct
,步骤略,最终结果:
消息接收
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(queues ="direct.queue1")publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="direct.queue2")publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@TestpublicvoidtestSendDirectExchange(){// 交换机名称String exchangeName ="hmall.direct";// 消息String message ="红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"red", message);}
由于使用的red这个key,所以两个消费者都收到了消息:
我们再切换为blue这个key:
@TestpublicvoidtestSendDirectExchange(){// 交换机名称String exchangeName ="hmall.direct";// 消息String message ="最新报道,哥斯拉是居民自治巨型气球,虚惊一场!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue", message);}
你会发现,只有消费者1收到了消息:
总结
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
交换机
说明
Topic
类型的
Exchange
与
Direct
相比,都是可以根据
RoutingKey
把消息路由到不同的队列。
只不过
Topic
类型
Exchange
可以让队列在绑定
BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以
.
分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
图示:
假如此时publisher发送的消息使用的
RoutingKey
共有四种:
china.news
代表有中国的新闻消息;china.weather
代表中国的天气消息;japan.news
则代表日本新闻japan.weather
代表日本的天气消息;
解释:
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括: -china.news
-china.weather
topic.queue2
:绑定的是#.news
,凡是以.news
结尾的routing key
都会被匹配。包括: -china.news
-japan.news
接下来,我们就按照上图所示,来演示一下Topic交换机的用法。
首先,在控制台按照图示例子创建队列、交换机,并利用通配符绑定队列和交换机。此处步骤略。最终结果如下:
消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
/**
* topicExchange
*/@TestpublicvoidtestSendTopicExchange(){// 交换机名称String exchangeName ="hmall.topic";// 消息String message ="喜报!孙悟空大战哥斯拉,胜!";// 发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}
消息接收
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}
声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
fanout示例
在consumer中创建一个类,声明队列和交换机:
packagecom.itheima.consumer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
* 声明交换机
* @return Fanout类型交换机
*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("hmall.fanout");}/**
* 第1个队列
*/@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/**
* 第2个队列
*/@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
direct示例
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:
packagecom.itheima.consumer.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{/**
* 声明交换机
* @return Direct类型交换机
*/@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("hmall.direct").build();}/**
* 第1个队列
*/@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1WithRed(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("red");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1WithBlue(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/**
* 第2个队列
*/@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2WithRed(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("red");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2WithYellow(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
基于注解声明
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
例如,我们同样声明Direct模式的交换机和队列:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue1"),
exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue2"),
exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}
是不是简单多了。
再试试Topic模式:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue1"),
exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue2"),
exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}
消息转换器
Spring的消息发送代码接收的消息体是一个Object:
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在
publisher
和
consumer
两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>
注意,如果项目中引入了
spring-boot-starter-web
依赖,则无需再次引入
Jackson
依赖。
配置消息转换器,在
publisher
和
consumer
两个服务的启动类中添加一个Bean即可:
@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。
总结
以上的代码已上传到Github
https://github.com/onenewcode/mq-demo
版权归原作者 过去日记 所有, 如有侵权,请联系我们删除。