同步和异步
同步调用的优势是什么?
•时效性强,等待到结果后才返回。
同步调用的问题是什么?
拓展性差 性能下降 级联失败问题
异步调用通常是基于消息通知的方式,包含三个角色:
•消息发送者:投递消息的人,就是原来的调用者
•消息接收者:接收和处理消息的人,就是原来的服务提供者
•消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
异步调用的优势是什么?
•耦合度低,拓展性强
•异步调用,无需等待,性能好
•故障隔离,下游服务故障不影响上游业务
•缓存消息,流量削峰填谷
异步调用的问题是什么?
•不能立即得到调用结果,时效性差
•不确定下游业务执行是否成功
•业务安全依赖于Broker的可靠性
MQ技术选型
RabbitMQ
ActiveMQ
RocketMQ
Kafka
公司/社区
Rabbit
Apache
阿里
Apache
开发语言
Erlang
Java
Java
Scala&Java
协议支持
AMQP,XMPP,SMTP,STOMP
OpenWire,STOMP,REST,XMPP,AMQP
自定义协议
自定义协议
可用性
高
一般
高
高
单机吞吐量
一般
差
高
非常高
消息延迟
微秒级
毫秒级
毫秒级
毫秒以内
消息可靠性
高
一般
高
一般
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
据统计,目前国内消息队列使用最多的还是RabbitMQ
安装部署
1.安装压缩包
2.部署docker
docker run \ -e RABBITMQ_DEFAULT_USER=itheima \ -e RABBITMQ_DEFAULT_PASS=123321 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network hm-net\ -d \ rabbitmq:3.8-management
2.1
可以看到在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
3.安装完成后,我们访问 http://192.168.48.131:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。
核心概念
RabbitMQ的整体架构及核心概念:
•virtual-host:虚拟主机,起到数据隔离的作用
•publisher:消息发送者
•consumer:消息的消费者
•queue:队列,存储消息
•exchange:交换机,负责路由消息
数据隔离
消息发送的注意事项有哪些?
•交换机只能路由消息,无法存储消息
•交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定
不同virtual host之间是数据隔离的,在一个virtual host 中不能处理其他虚拟机的交换机,队列数据
SpringAMQP
SpringAmqp的官方地址:Spring AMQP
AdvancedMessageQueuingProtocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
引入依赖
<**dependency**>
<**groupId**>org.springframework.boot</**groupId**>
<**artifactId**>spring-boot-starter-amqp</**artifactId**>
</**dependency**>
配置RabbitMQ服务端信息
spring:
rabbitmq:
**host**: 192.168.150.101 *# **主机名 *
*port: 5672 *# *端口
*virtual-host: /hmall *# *虚拟主机
*username: hmall *# *用户名
*password: 123 *# *密码
发送消息
SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:
SpringAMQP如何收发消息?
①引入spring-boot-starter-amqp依赖
②配置rabbitmq服务端信息
③利用RabbitTemplate发送消息
④利用@RabbitListener注解声明要监听的队列,监听消息
work模式
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息
Work模型的使用:
•多个消费者绑定到一个队列,可以加快消息处理速度
•同一条消息只会被一个消费者处理
•通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
交换机类型
交换机的作用主要是接收发送者发送的消息,并将消息路由到与其绑定的队列。
常见交换机的类型有以下三种:
Fanout:广播
Direct:定向
Topic:话题
**Fanout Exchange **会将接收到的消息路由到每一个跟其绑定的queue,所以也叫广播模式
交换机的作用是什么?
•接收publisher发送的消息
•将消息按照规则路由到与之绑定的队列
•FanoutExchange的会将消息路由到每个绑定的队列
发送消息到交换机的API是怎样的?
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
l每一个Queue都与Exchange设置一个BindingKey
l发布者发送消息时,指定消息的RoutingKey
lExchange将消息路由到BindingKey与消息RoutingKey一致的队列
描述下Direct交换机与Fanout交换机的差异?
•Fanout交换机将消息路由给每一个与之绑定的队列
•Direct交换机根据RoutingKey判断路由给哪个队列
•如果多个队列具有相同RoutingKey,则与Fanout功能类似
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 **.**分割。
Queue与Exchange指定BindingKey时可以使用通配符:
u#:代指0个或多个单词
u*:代指一个单词
描述下Direct交换机与Topic交换机的差异?
•Topic交换机接收的消息RoutingKey可以是多个单词,以 . 分割
•Topic交换机与队列绑定时的bindingKey可以指定通配符
•#:代表0个或多个词
•*:代表1个词
声明队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
•Queue:用于声明队列,可以用工厂类QueueBuilder构建
•Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
•Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
一
二
SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
声明队列、交换机、绑定关系的Bean是什么?
•Queue
•FanoutExchange、DirectExchange、TopicExchange
•Binding
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
•@Queue
@Exchange
MQ消息转换器
Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
存在下列问题:
•JDK的序列化有安全风险
•JDK序列化的消息太大
•JDK序列化的消息可读性差
建议采用JSON序列化代替默认的JDK序列化,要做两件事情:
在publisher和consumer中都要引入jackson依赖:
<**dependency**>
<**groupId**>com.fasterxml.jackson.core</**groupId**>
<**artifactId**>jackson-databind</**artifactId**>
</**dependency**>
在publisher和consumer中都要配置MessageConverter:
@Bean
**public **MessageConvertermessageConverter(){
**return new **Jackson2JsonMessageConverter();
}
版权归原作者 韩书永 所有, 如有侵权,请联系我们删除。