👨🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:详解SpringCloud微服务技术栈:DockerCompose部署微服务集群
📚订阅专栏:微服务技术全家桶
希望文章对你们有所帮助
RabbitMQ的使用还是很广泛的,主要是用在异步通讯的过程中的消息中间件,而在之前我学习Redis的时候,已经分别通过阻塞队列和Redis的某种数据结构实现了异步通信,可以看我的这两篇总结文章:
Redis:原理速成+项目实战——Redis实战9(秒杀优化)
Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)
同步通讯与异步通讯的原理、优缺点就不在这里讲解了,之前提到过,做异步通讯的最主流的还得是RabbitMQ,所以速成一波。
RabbitMQ入门到实践
MQ常见技术介绍
MQ(MessageQueue),即存放消息的队列,也就是事件驱动架构中的Broker。
RabbitMQActiveMQRocketMQKafka可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
RabbitMQ看起来最劣势的地方是单机吞吐量,但是其吞吐量也足够满足大多数企业的需求了。国内用的比较多的是Kafka和RabbitMQ,前者适合拥有海量数据,且对信息安全没有那么高要求的情景。
RabbitMQ快速入门
介绍和安装(基于Centos7)
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
RabbitMQ官网
安装步骤:
1、下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management
方式二:本地下载镜像包,上传到虚拟机后,使用命令进行加载:
镜像包安装:
链接:https://pan.baidu.com/s/1L-Kzd8PWMYaBwGQPwI1z9g?pwd=mjt5
提取码:mjt5
加载命令:
docker load -i mq.tar
2、安装MQ
执行下面命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
其中,15672是管理平台的端口,5672是做消息通信的端口。
现在就可以直接访问RabbitMQ后台管理界面并登录:
RabbitMQ的结构:
发送者将消息发送到交换机exchange,然后再发到队列,消费者从队列中获取消息并处理。每个RabbitMQ的用户都有一个自己的VirtualHost,且互相隔离。
RabbitMQ的几个概念:
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
消息模型介绍
MQ的官方文档给了5种MQ的Demo,对应了几种不同的用法:
1、基本消息队列(BasicQueue)
2、工作消息队列(WorkQueue)
3、发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
(1)Fanout Exchange:广播
(2)Direct Exchange:路由
(3)Topic Exchange:主题
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息
而并没有交换机。
简单队列模型
虽是简单队列模型,但要用的API太多了,写起来复杂,直接导入下面的工程跑一下publisher测试类再跑一下consumer测试类,自行调试并在RabbitMQ的后台界面查看相关信息(admin、connection、channel等)。
链接:https://pan.baidu.com/s/1crv6sUmKM44Crj8u–J4GA?pwd=smjn
提取码:smjn
基本消息队列的消息发送流程:
1、建立connection
2、创建channel
3、利用channel声明队列
4、利用channel向队列发送消息
基本消息队列的消息接收流程:
1、建立connection
2、创建channel
3、利用channel声明低劣
4、定义consumer的消费行为handleDilivery()
5、利用channel将消费者与队列绑定
SpringAMQP
SpringAMQP可以大大的简化消息发送与接收的代码。在这里进行SpringAMQP的介绍,并且利用它来实现RabbitMQ中的五种消息队列模型。
基本介绍
AMQP即为Advanced Message Queuing Protocol(先进消息队列协议),是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
SpringAMQP即为AMQP的一种实现,基于AMQP协议的定义的一套API规范,提供了模板来发送和接收消息。包含2部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
入门案例
利用SpringAMQP实现Rabbit中的入门案例——HelloWorld中的基础消息队列功能。
先操作下面的流程:
1、在父工程中引入spring-amqp的依赖(发送和接收都要用到)
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
3、在consumer服务中编写消费逻辑,绑定simple.queue这个队列
消息发送
1、在publisher服务中编写application.yml,添加mq连接信息:
spring:rabbitmq:host: 192.168.177.130 # RabbitMQ的ip地址port:5672# RabbitMQ的端口username: itcast
password:123321virtual-host: /
2、在publisher服务中新建一个测试类,编写测试方法,利用RabbitTemplate发送消息到simple.queue这个队列(从之前的测试中创建出来的,没有的话去自行运行创建出来即可):
@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@ResourceprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendMessage2SimpleQueue(){String queueName ="simple.queue";String message ="hello, spring amqp";
rabbitTemplate.convertAndSend(queueName, message);}}
消息接收
在consumer中编写消费逻辑,监听simple.queue:
1、在consumer服务中国编写application.yml,添加mq连接信息:
spring:rabbitmq:host: 192.168.177.130 # RabbitMQ的ip地址port:5672# RabbitMQ的端口username: itcast
password:123321virtual-host: /
2、在consumer服务中新建一个类,编写消费逻辑:
@Component// 注册成一个beanpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")//监听的队列名publicvoidlistenSimpleQueue(String msg){System.out.println("消费者接受到simple.queue的消息:【"+ msg +"】");}}
这个消费行为应该要自动的让其进行,所以将其交给Spring托管,只需要执行Spring的启动类函数Application即可自动实现监听。
Work Queue 工作队列模型
如果消息很多,显然consumer1与consumer2要一起执行,这是一种合作关系。
如果publisher发送消息的频率很高,而一个consumer无法处理完,这时候就需要其它consumer帮助,即为WorkQueue(工作队列)模型。
接下来要模拟WorkQueue,实现一个队列绑定多个消费者。
实现的基本思路如下:
1、在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@TestpublicvoidtestSendMessage2WorkQueue()throwsInterruptedException{String queueName ="simple.queue";String message ="hello, message__";for(int i =0; i <=50; i++){
rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
2、在consumer服务中定义两个消息监听者,都监听simple.queue队列
@RabbitListener(queues ="simple.queue")//监听的队列名publicvoidlistenWorkQueue(String msg)throwsInterruptedException{System.out.println("消费者1接受到消息:【"+ msg +"】"+LocalTime.now());Thread.sleep(20);//每秒消费50条}@RabbitListener(queues ="simple.queue")//监听的队列名publicvoidlistenWorkQueue2(String msg)throwsInterruptedException{System.err.println("消费者2接受到消息:【"+ msg +"】"+LocalTime.now());Thread.sleep(200);//每秒消费5条}
这样理论上消费者是可以自己处理完消息的。然而启动进行测试可以发现,消费者1和消费者2都执行了25条,且消费者1执行完了,消费者2还在那低效率执行,直到结束,导致1s发送的50条消息用了5s才执行完毕:
造成这个问题的原因是因为RabbitMQ的消息预取机制,它给两个消费者都平均分配了消息,但是consumer2相对没那样的能力处理那么多消息,所以应该要让consumer2少拿一点任务。
解决方式是在application.yml中设置preFetch,可以控制预取消息的上限:
发布订阅模型
之前的模型,只要消息被其中之一的消费者消费了,这个消息就会消失。而发布订阅模型与之前案例的区别就是允许将同一消息发送给多个消费者,实现的方式是加入了交换机exchange。结构如下:
当消息被交换机安排到了多个队列去,自然就可以实现该消息被多个消费者给消费。
常见exchange类型包括:
(1)Fanout:广播
(2)Direct:路由
(3)Topic:话题
FanoutExchange
FanoutExchange会将接收到的消息路由到每一个跟其绑定的queue,绑定可以由SpringAMQP提供的API去声明队列和交换机并且实现绑定。
演示流程:
1、在consumer服务中,利用代码声明队列、交换机,并将两者绑定
@ConfigurationpublicclassFanoutConfig{//声明交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("itcast.fanout");}//声明队列@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}//将队列1绑定到交换机@BeanpublicBindingfanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}@BeanpublicBindingfanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
执行主程序,Spring将会自动读取配置并执行:
2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2:
@RabbitListener(queues ="fanout.queue1")//监听的队列名publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者接受到fanout.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")//监听的队列名publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者接受到fanout.queue2的消息:【"+ msg +"】");}
编写完毕后重启ConsumerApplication。
3、在publisher中编写测试方法,向itcast.fanout发送消息
@TestpublicvoidtestSendFanoutExchange(){//交换机名称String exchangeName ="itcast.fanout";//消息String message ="hello, every one!";//发送消息
rabbitTemplate.convertAndSend(exchangeName,"", message);}
DirectExchange
DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes):
1、每一个Queue都和Exchange设置一个BindingKey
2、发布者发送消息时,指定消息的RoutingKey
3、Exchange将消息路由到BindingKey与消息RouingKey一致的队列
需要注意的是,队列之间是可以绑定相同的BindingKey的,一个队列可以绑定多个BindingKey,所以如果publisher发送消息会被多个queue读取,也就是广播,因此DirectExchange是可以模拟FanoutExchange的。
实现流程如下:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey(不再用bean声明,要声明一堆的东西,太复杂了)
2、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
上面2步一起在consumer的监听类中完成:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue1"),
exchange =@Exchange(name ="itcast.direct", type =ExchangeTypes.DIRECT),
key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接受到direct.queue1的消息"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue2"),
exchange =@Exchange(name ="itcast.direct", type =ExchangeTypes.DIRECT),
key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者1接受到direct.queue2的消息"+ msg +"】");}
3、在publisher中编写测试方法,向itcast. direct发送消息
@TestpublicvoidtestSendDirectExchange(){//交换机名称String exchangeName ="itcast.direct";//消息String message ="hello, blue!";//发送消息
rabbitTemplate.convertAndSend(exchangeName,"blue", message);}
TopicExchange
TopicExchange与DirectExchange类似,区别在于其routingKey必须是多个单词的列表,分别以
.
分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:0个或多个单词
*:一个单词
实现流程:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey
2、在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
上面两个步骤的代码:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue1"),
exchange =@Exchange(name ="itcast.topic", type =ExchangeTypes.TOPIC),
key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接受到topic.queue1的消息"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue2"),
exchange =@Exchange(name ="itcast.topic", type =ExchangeTypes.TOPIC),
key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者1接受到topic.queue2的消息"+ msg +"】");}
3、在publisher中编写测试方法,向itcast.topic发送消息
@TestpublicvoidtestSendTopicExchange(){//交换机名称String exchangeName ="itcast.topic";//消息String message ="南京航空航天大学第五轮学科评估有7个A类评分!";//发送消息
rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}
消息转换器
Ctrl+P查看参数,可以发现发出的消息的类型都是Object类型的。
说明SpringAMQP允许我们发任何对象,比如List,Map,但是RabbitMQ是以字节传输的,这说明了SpringAMQP会帮我们序列化为字节后发送。
Spring的对消息对象的处理是基于JDK的ObjectOutputStream完成序列化,这种序列化的性能差,而且安全有问题,数据长度也会很长。所以最好换一个序列化方式。
推荐用JSON方式序列化,步骤如下:
1、在publisher服务引入依赖:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
2、在publisher服务声明MessageConverter:
@BeanpublicMessageConvertermessageConverter(){return(MessageConverter)newJackson2JsonMessageConverter();}
消息的接收也需要和上面一样:引入相同的依赖,然后在consumer服务中定义MessageConverter。
版权归原作者 布布要成为最负责的男人 所有, 如有侵权,请联系我们删除。