零、了解MQ
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好。
RabbitMQ支持多种协议。
- AMQP(Advanced Message Queuing Protocol):AMQP是RabbitMQ的核心协议,它是一种开放的、可互操作的消息传递协议。AMQP定义了消息的格式、消息的路由和传递规则等,使得不同的消息中间件可以进行互操作。
- MQTT(Message Queuing Telemetry Transport):MQTT是一种轻量级的、基于发布/订阅模式的消息传输协议。它适用于低带宽、高延迟或不稳定网络环境下的物联网设备通信。RabbitMQ通过插件支持MQTT协议。
- STOMP(Simple Text Oriented Messaging Protocol):STOMP是一种简单的文本导向的消息传递协议,它定义了客户端和消息中间件之间的交互规则。RabbitMQ通过插件支持STOMP协议。
- HTTP(Hypertext Transfer Protocol):RabbitMQ还提供了HTTP API,通过HTTP协议可以与RabbitMQ进行交互,包括发送消息、创建队列、管理交换器等操作。
而Spring boot框架对AMQP协议进行了封装,使用Spring AMQP的依赖调用里面的API即可实现快速使用RabbitMQ进行开发。该文章以Spring AMQP为例。
一、首先拉取,设置,启动RabbitMQ。
publisher:生产者,也就是发送消息的一方
consumer:消费者,也就是消费消息的一方
queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
1.拉取启动RabbitMQ
docker pull rabbitmq:3.12-management #推荐使用
docker pull rabbitmq:3.12 #这个是不带Web管理页面的,是需要自己手动安装插件
docker run --name myrabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq:3.12-management
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl list_users
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
docker restart rabbitmq
--name 是 容器别名,
将 宿主机 5672端口映射到 容器内5672,
端口15672端口映射到 容器内15672 端口,
访问宿主机端口的时候会映射到对应容器端口,
-d 表示后台运行。
5672端口:AMQP(Advanced Message Queuing Protocol)协议的默认端口,用于客户端与RabbitMQ服务器之间的通信。
15672端口:RabbitMQ的管理界面,默认使用HTTP协议,用于监控和管理RabbitMQ服务器。
4369端口:Erlang分布式节点通信端口,用于RabbitMQ节点之间的通信。
25672端口:Erlang分布式节点通信端口,用于集群中的内部通信。
5671端口:安全的AMQP端口,使用TLS/SSL进行加密通信。
2.依赖和设置
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
首先配置MQ地址,在
publisher
服务的
application.yml
中添加配置:
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: guest # 用户名
password: guest # 密码
三.代码演示RabbitMQ
简单模式(Hello World)
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
@Component
public class consumer {
@RabbitListener(queues = "simple.queue")
public void consumerQueue(String message){
System.out.println(message);
}
}
工作队列模式(Work queues)
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
在配置中,prefetch: 1
表示每个消费者每次只能从队列中预取1个消息,消费完就能拿下一次,不需要等轮询。它可以帮助保证每个消息在被消费者处理时都能得到较为均匀的分配,避免某个消费者处理速度慢而导致其他消费者空闲的情况。如果不配置着东东的话,那么rabbitmq采用的就是一个公平轮询的方式,将消息依次发给一个消费,等他消费完了再发下一个给另外的消费者
交换机
在之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化
可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
Publisher:生产者,不再发送消息到队列中,而是发给交换机
Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
Consumer:消费者,与以前一样,订阅队列,没有变化
订阅模式(Publish/Subscribe)
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
路由模式(Routing)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
声明一个名为hmall.direct的交换机
声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
在publisher中编写测试方法,向hmall.direct发送消息
在consumer服务的SpringRabbitListener中添加方法:
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
主题模式(Topics)
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说我们的交换机绑定了多个不同的routingKey,在direct模式中虽然能做到有选择性地接收日志,但是它的选择性是单一的,就是说我的一条消息,只能被一个相同routingKey的绑定缩消费,但是如果我们想要在让它的选择性变得多元,比如划分一个子组,一个消息可以根据一个组别的队列进行投递,就需要用到Topics模式
Topic
类型
Exchange
可以让队列在绑定
BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以“
**.**”
分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割 - Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
API声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。 因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
fanout示例
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
direct示例
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
**Topic和Direct基本是一样的配置方式,这里就不演示了 **
基于注解声明
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
例如,我们同样声明Direct模式的交换机和队列:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
消息转换器
在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在
publisher
和
consumer
两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
消费者接收Object
我们在consumer服务中定义一个新的消费者,publisher是用什么类型发送,那么消费者也一定要用什么类型接收,格式如下:
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}
版权归原作者 KSYC 所有, 如有侵权,请联系我们删除。