0


微服务(SpringCloud)第四篇之RabbitMQ(消息队列基础篇)

作者简介:☕️大家好,我是intelligent_M,一个Java后端开发者!
当前专栏:intelligent_M—— 微服务(SpringCloud) ,CSDN博客。

后续会更新Java相关技术栈以及链表哈希表二叉树…,回溯算法贪心算法…等等算法题。
创作不易 欢迎点赞评论!!!

微服务


RabbitMQ(基础篇)

同步和异步

  • RabbitMQ是高性能的异步通讯组件
  • 微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式,为什么呢?

我们先来看看什么是同步通讯和异步通讯。如图:
在这里插入图片描述

  • 同步通讯:就如同打视频电话,双方的交互都是实时的。因此同一时刻你只能跟一个人打视频电话。在这里插入图片描述
  • 异步通讯:就如同发微信聊天,双方的交互不是实时的,你不需要立刻给对方回应。因此你可以多线操作,同时跟多人聊天。在这里插入图片描述

两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发微信可以同时与多个人收发微信,但是往往响应会有延迟。

所以,如果我们的业务需要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而如果我们追求更高的效率,并且不需要实时响应,则应该选择异步通讯(异步调用)。

同步调用

在这里插入图片描述

  • 同步调用的优势是什么? - 时效性强,等待到结果后才返回
  • 同步调用的问题是什么?
  • 1.拓展性差在这里插入图片描述

目前的业务相对简单,但是随着业务规模扩大,产品的功能也在不断完善。也就是说每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动,不符合开闭原则,拓展性不好。

  • 2.性能下降

由于我们采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:
在这里插入图片描述
假如每个微服务的执行时长都是50ms,则最终整个业务的耗时可能高达300ms,性能太差了。

  • 3.级联失败问题 由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。 这其实就是同步调用的级联失败问题。

但是大家思考一下,我们假设用户余额充足,扣款已经成功,此时我们应该确保支付流水单更新为已支付,确保交易成功。毕竟收到手里的钱没道理再退回去吧。因此,这里不能因为短信通知、更新订单状态失败而回滚整个事务。

异步调用

  • 异步调用方式其实就是基于消息通知的方式,一般包含三个角色: - 消息发送者:投递消息的人,就是原来的调用方- 消息代理:管理,暂存,转发消息,你可以把它理解成微信服务器- 消息接收者:接收和处理消息的人,就是原来的服务提供方在这里插入图片描述
  • 支付服务不在同步调用业务关联度低的服务,而是发送消息通知到Broker。
  • 具备下列优势:
  • 解除耦合,拓展性强
  • 无需等待,性能好
  • 故障隔离
  • 缓存消息,流量削峰填谷在这里插入图片描述QPS为每秒钟的访问量在这里插入图片描述
  • 异步调用的优势是什么? - 耦合度低,拓展性强- 异步调用,无需等待,性能好- 故障隔离,下游服务故障不影响上游业务- 缓存消息,流量削峰填谷
  • 异步调用的问题是什么? - 不能立即得到调用结果,时效性差- 不确定下游业务执行是否成功- 业务安全依赖于Broker(消息代理)的可靠性

MQ技术选型

  • MQ(MessageQueue),中文是消息队列,字面意思就是存放消息的队列。也就是异步调用中的Broker在这里插入图片描述
  • RabbitMQ的消息可靠性比较高很多公司也在用RabbitMQ,所以这里我们以RabbitMQ为例讲解,其他的MQ在OpenFeign的调用下都大同小异在这里插入图片描述在这里插入图片描述
  • RabbitMQ的整体架构及核心概念:
  • virtual-host:虚拟主机,起到数据隔离的作用
  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由转发消息(没有存储消息的能力)在这里插入图片描述
  • 快速入门
  • 1.创建一个消息队列在这里插入图片描述在这里插入图片描述
  • 2.选择交换机并绑定消息队列在这里插入图片描述在这里插入图片描述 可以看到队列和交换机绑定成功了在这里插入图片描述在这里插入图片描述
  • 3.可以给利用交换机发送消息在这里插入图片描述
  • 4.队列可以查看消息在这里插入图片描述

数据隔离

  • 需求:在RabbitMQ的控制台完成下列操作:
  • 新建一个用户hmall
  • 为hmall用户创建一个virtual host
  • 测试不同virtual host之间的数据隔离现象在这里插入图片描述 用hmall用户登录创建虚拟主机在这里插入图片描述 选择自己的虚拟主机就看不到别的用户的队列(保证了数据隔离)在这里插入图片描述 交换机也是隔离的在这里插入图片描述
  • 所以当你有多个项目的时候,可以创建不同的用户以及不同的虚拟主机实现数据隔离

SpringAMQP(Java客户端)

快速入门

  • SpringAmqp的官方地址在这里插入图片描述在这里插入图片描述
  • 1.引入spring-amqp依赖
  • 在工程中引入spring-amqp依赖,这样publisher和consumer服务都可以使用:在这里插入图片描述
  • 2.配置RabbitMQ服务信息
  • 在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ在这里插入图片描述
  • 3.发送消息
  • SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:
@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestSendMessage2Queue(){//队列名称String queueName ="simple.queue";//消息String msg ="hello, amqp!";//发送消息
        rabbitTemplate.convertAndSend(queueName, msg);}
  • 4.接收消息
  • SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:
@Slf4j//日志@Component//注册为一个BeanpublicclassMqListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String msg){System.out.println("消费者收到了simple.queue的消息:【"+ msg +"】");}}

在这里插入图片描述

  • SpringAMQP如何发消息? - 1.引入spring-boot-starter-amqp依赖- 2.配置rabbitmq服务端信息- 3.利用RabbitTemplate发送消息- 4.利用@RabbitListener注解声明要监听的队列,监听消息

work模式(Work Queues)

  • Work queues,任务模型简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息在这里插入图片描述
  • 消费者消息推送限制 - 默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者,但者并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。- 因此我们需要修改application.yaml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:在这里插入图片描述
  • Work模型的使用:
  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 通一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机

  • 真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
  • Fanout:广播
  • Direct:定向
  • Topic:话题在这里插入图片描述

Fanout交换机

  • FanoutExchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式在这里插入图片描述
  • 利用SpringAMQP演示FanoutExchange的使用
  • 实现思路如下:
  • 1.在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  • 2.在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  • 3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg)throwsInterruptedException{System.out.println("消费者1 收到了 fanout.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg)throwsInterruptedException{System.out.println("消费者2 收到了 fanout.queue2的消息:【"+ msg +"】");}
  • 4.在publisher中编写测试方法,向hmall.fanout发送消息
@TestvoidtestSendFanout(){String exchangeName ="hmall.fanout";String msg ="hello, everyone!";
        rabbitTemplate.convertAndSend(exchangeName,null, msg);}

在这里插入图片描述

  • 交换机的作用是什么? - 接收publisher发送的消息- 将消息按照规则路由到与之绑定的队列- FanoutExchange会将消息路由到每个绑定的队列

Direct交换机

  • DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由 - 每一个Queue都与Exchange设置一个Bindingkey- 发布者发送消息时,指定消息的RoutingKey- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列在这里插入图片描述在这里插入图片描述
  • 描述下Direct交换机与Fanout交换机的差异 - Fanout交换机将消息路由给每一个与之绑定的队列- Direct交换机根据RoutingKey判断路由给哪个队列- 如果多个队列具有相同RoutingKey,则与Fanout功能类似

Topic交换机(推荐使用,功能最强大)

  • TopicExchange与DirectExchange类似,区别在于routingKey可以时多个单词的列表,并且以 . 分割,Queue与Exchange指定BindingKey时可以使用通配符:
  • #:代指0个或多个单词
  • *:代指一个单词在这里插入图片描述
  • 利用SpringAMQP演示DirectExchange的使用
  • 1.在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2在这里插入图片描述
  • 2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定在这里插入图片描述
  • 3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(String msg)throwsInterruptedException{System.out.println("消费者1 收到了 topic.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(String msg)throwsInterruptedException{System.out.println("消费者2 收到了 topic.queue2的消息:【"+ msg +"】");}
  • 4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息
@TestvoidtestSendTopic(){String exchangeName ="hmall.topic";String msg ="今天天气挺不错,我的心情的挺好的";
        rabbitTemplate.convertAndSend(exchangeName,"china.weather", msg);}
  • 描述下Direct交换机和Topic交换机的差异? - Topic交换机接收的消息RoutingKey可以时多个单词,以点分割- Topic交换机与队列绑定时的bindingKey可以指定通配符- #:代表0个或多个词- *:代表1个单词

声明队列交换机

  • SpringAMQP提供了几个类,用来声明队列,交换机及其绑定关系: - Queue:用于声明队列,可以用工厂类QueueBuilder构建- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建在这里插入图片描述
  • 例如,声明(基于Bean的声明方式)一个Fanout类型的交换机,并创建队列与其绑定:
  • 该方式需要写多个Bean比较麻烦不推荐
@ConfigurationpublicclassFanoutConfiguration{@Bean//声明fanout交换机publicFanoutExchangefanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();returnnewFanoutExchange("hmall.fanout2");}@Bean//声明队列publicQueuefanoutQueue3(){// QueueBuilder.durable("ff").build();//durable 耐用的持久的(持久化把队列写入磁盘)returnnewQueue("fanout.queue3");}@Bean//声明绑定关系publicBindingfanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}@Bean//声明队列publicQueuefanoutQueue4(){returnnewQueue("fanout.queue4");}@Bean//声明绑定关系publicBindingfanoutBinding4(){returnBindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}}
  • SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
  • 该方式大大简化了创建Bean的过程简化了代码推荐使用这种方式
@RabbitListener(bindings =@QueueBinding(
           value =@Queue(name ="direct.queue1", durable ="true"),
           exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
           key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg)throwsInterruptedException{System.out.println("消费者1 收到了 direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
           value =@Queue(name ="direct.queue2", durable ="true"),
           exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
           key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg)throwsInterruptedException{System.out.println("消费者2 收到了 direct.queue2的消息:【"+ msg +"】");}
  • 声明队列,交换机,绑定关系的Bean是什么? - Queue- FanoutExchange,DirectExchange,TopicExchange- Binding
  • 基于@RabbitListener注解声明队列和交换机有哪些常见注解? - @Queue- @Exchanger

消息转换器

  • 需求:测试利用SpringAMQP发送对象类型的消息
  • 1.声明一个队列,名为object.queue
  • 2.编写单元测试,向队列中直接发送一条消息,消息类型为Map
  • 3.在控制台查看消息,总结问题在这里插入图片描述在这里插入图片描述
  • 这里可以看到它对我们发送的消息做了序列化,jdk自带的对象字节流给我们的消息转成字节了
  • Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
  • 存在以下问题- 1.JDK的序列化有安全风险- 2.JDK序列化的消息太大占用空间- 3.JDK序列化的消息可读性差
  • 建议采用JSON序列化代替默认的JDK序列化,需要做两件事情
  • 1.在publisher和consumer中都要引入依赖:
<!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
  • 2.在publisher和consumer中都要配置MessageConverter:
@BeanpublicMessageConverterjacksonMessageConvertor(){returnnewJackson2JsonMessageConverter();}
  • 重新发送刚才的消息,效果如下在这里插入图片描述

看完本篇请前往SpringCloud微服务第五篇


本文转载自: https://blog.csdn.net/m0_62388538/article/details/133702386
版权归原作者 Intelligent_M 所有, 如有侵权,请联系我们删除。

“微服务(SpringCloud)第四篇之RabbitMQ(消息队列基础篇)”的评论:

还没有评论