0


服务异步通讯——RabbitMQ

目录

在这里插入图片描述

一、同步通讯的优缺点

在这里插入图片描述
微服务间基于Feign的调用就属于同步方式

优点:
时效性强,可立即得到结果

缺点
1、耦合度高:每次加入新的需求,都要修改原来的代码
2、性能和吞吐能力下降:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
3、有额外的资源消耗:调用链中的每个服务都在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
4、级联失败:如果服务提供者出现问题,所有的调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障

二、异步通讯的优缺点

异步调用常见实现就是事件驱动模式(事件代理)
在这里插入图片描述优点(基于Broker事件代理实现)

1、服务解耦
2、性能提升,吞吐量提升(不存在调用,不需要等待)
3、故障隔离(不用担心联级问题)
4、流量消峰

缺点
1、依赖于Broker的可靠性、安全性、吞吐能力
2、架构复杂,业务没有明显的流程线,不好追踪管理

同步与异步均有优缺点,应该在什么时候使用同步,什么情况下使用异步?
大多情况下,我们会使用同步,多数情况下对并发没有很高的要求,相反对时效性要求较高

三、初识MQ

MQ(Message Queue)消息队列,字面看来就是存放消息的队列。也就是事件驱动架构中的Broker

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
    解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要
          主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
          
    异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
    
    削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

MQ的缺点
1、系统可用性降低。依赖服务越多,服务越容易挂掉。需要考虑MQ瘫痪的情况
2、系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性
3、业务一致性。主业务和从属业务一致性的处理

主要的MQ产品包括:RabbitMQ、ActiveMQ、RocketMQ、ZeroMQ、Kafka、IBM WebSphere 等。

MQ常见技术介绍
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScala&Java协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议可用性高(支持主从集群)一般高高单击吞吐量一般差高非常高消息延迟微秒级毫秒级毫米级毫米以内消息可靠性高一般高一般
MQ常见框架

四、RabbitMQ快速入门

RabbitMQ介绍与安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

资源链接https://pan.baidu.com/s/1LOgrXBgR-x6crmlhWvUP0w
提取码:GY66

单机部署

此处我们采用在Centos7虚拟机中使用Docker来单机部署。

1、下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

资源链接中已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

在这里插入图片描述

2、安装MQ

执行下面的命令来运行MQ容器:

docker run \
 -e RABBITMQ_DEFAULT_USER=GY \       #用户名
 -e RABBITMQ_DEFAULT_PASS=123456 \   #密码
 --name mq \                         #名称
 --hostname mq1 \                    #主机名(不配置也可以,集群部署时需要配置)
 -p 15672:15672 \                    #端口映射(RabbitMQ管理平台端口——提供ui界面,管理较方便)
 -p 5672:5672 \                      #端口映射(消息通信端口)
 -d \                                #后台运行
 rabbitmq:3-management               #镜像名称

在这里插入图片描述

浏览器访问,成功进入RabbitMQ管理界面
在这里插入图片描述

输入用户名、密码成功进入RabbitMQ管理平台
在这里插入图片描述

集群部署

接下来,我们看看如何安装RabbitMQ的集群。

1、集群分类
在RabbitMQ的官方文档中,讲述了两种集群的配置方式:

  • 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。
  • 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

我们先来看普通模式集群。

2、设置网络

首先,我们需要让3台MQ互相知道对方的存在。
分别在3台机器中,设置 /etc/hosts文件,添加如下内容:

192.168.150.101 mq1
192.168.150.102 mq2
192.168.150.103 mq3

并在每台机器上测试,是否可以ping通对方:

RabbitMQ概述

RabbitMQ的结构和概念
在这里插入图片描述

RabbitMQ中的几个概念
● Publisher:消息发送者

● consumer:消息消费者

● channel:操作MQ的工具

● exchange:交换机,路由消息到队列中

● queue:队列,缓存消息

● virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

常见消息模型

MQ的官方文档https://www.rabbitmq.com/
MQ的官方文档中给出了个Demo示例,对应了几种不同的用法:

● 基本消息队列(BasicQueue)
● 工作消息队列(WorkQueue)

在这里插入图片描述

●  发布订阅(Publish、Subscribe),又根据<font color=red>交换机类型不同</font>分为三种
        ◆ Fanout  Exchange:广播
        ◆ Direct  Exchange:路由
        ◆ Topic  Exchange:主题

在这里插入图片描述

一、HelloWorld案例
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
在这里插入图片描述

●  publisher:消息发布者,将消息发送到队列queue
●  queue:消息队列,负责接收并缓存消息
●  consumer:订阅队列,处理队列中的消息

在这里插入图片描述
资源链接
demo工程https://pan.baidu.com/s/1UShyzNDNzcUdIEZ4ektXUw
提取码:GY66

1、导入demo工程
2、运行publisher(消息发布者)服务中的测试类PublisherTest中的测试方法testMessage()
3、查看RabbitMQ控制台的消息
4、启动consumer(消息消费者)服务,查看是否能接收消息

消息发送测试

publicclassPublisherTest{@TestpublicvoidtestSendMessage()throws IOException, TimeoutException {// 1.建立连接
        ConnectionFactory factory =newConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.197.140");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("GY");
        factory.setPassword("123456");// 1.2.建立连接
        Connection connection = factory.newConnection();// 2.创建通道Channel
        Channel channel = connection.createChannel();// 3.创建队列
        String queueName ="simple.queue";
        channel.queueDeclare(queueName,false,false,false, null);// 4.发送消息
        String message ="hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【"+ message +"】");// 5.关闭通道和连接
        channel.close();
        connection.close();}}

消息接收测试

publicclassConsumerTest{publicstaticvoidmain(String[] args)throws IOException, TimeoutException {// 1.建立连接
        ConnectionFactory factory =newConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.197.140");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("GY");
        factory.setPassword("123456");// 1.2.建立连接
        Connection connection = factory.newConnection();// 2.创建通道Channel
        Channel channel = connection.createChannel();// 3.创建队列
        String queueName ="simple.queue";
        channel.queueDeclare(queueName,false,false,false, null);// 4.订阅消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties,byte[] body)throws IOException {// 5.处理消息
                String message =newString(body);
                System.out.println("接收到消息:【"+ message +"】");}});
        System.out.println("等待接收消息。。。。");}}
基本队列的消息发送流程:
1.  建立connection
2.   创建channel
3.   利用channel声明队列
4.   利用channel向队列发送消息

基本队列的消息接收流程:
1、   建立connection
2、  创建channel
3、  利用channel声明队列
4、  定义consumer的消费行为handleDelivery()5、 利用channel将消费者与队列绑定

生产者与消费者之所以都要声明队列,是因为避免队列不存在

五、SpringAMQP

AMQP :  Advanced  Message     Queuing  Protocol,是用于在应用程序或之间传递业务消息的
         开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。(协议/标准)

Spring  AMQP:  Spring  AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消
               息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现

特征

●  侦听器容器,用于异步处理入栈消息
●  用于发送和接收消息的RabbitTemplate
●  RabbitAdmin用于自动声明队列,交换和绑定

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

Basic Queue 简单队列模型

在这里插入图片描述

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

1、在父工程中引入spring-amqp的依赖(因为publisher和consumer服务都需要amqp依赖,因此将依赖直接放到父工程中mq-demo中)

<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、在publisher服务中利用RabbitTemplate发送消息到simple.queue队列
① 在publisher服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.197.140 # rabbitMQ的ip地址port:5672 # 端口username: GY
    password:123456virtual-host: /  #虚拟主机

② 在publisher服务中新建一个测试类,编写测试方法:

@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@Autowiredprivate RabbitTemplate rabbitTemplate;@TestpublicvoidtestSendMessage2SimpleQueue(){
        String queueName ="simple.queue";
        String message ="hello, spring amqp!";
        rabbitTemplate.convertAndSend(queueName, message);}

运行测试方法在这里插入图片描述

进入浏览器访问,消息成功发送到队列
在这里插入图片描述

3、在consumer服务中编写消费逻辑,绑定simple.queue这个队列()监听simple.queue
① 在consumer服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.197.140 # rabbitMQ的ip地址port:5672 # 端口username: GY
    password:123456virtual-host: /      # 虚拟主机

② 在consumer服务中创建一个类,编写消费逻辑:

@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String msg){
         System.out.println("消费者接收到simple.queue的消息:【"+ msg +"】");}

启动服务,成功接收消息

消息一旦消费就会从队列中删除,RabbitMQ没有消息回溯功能

Work Queue 工作队列模型

work模型的使用:

        ●  多个消费者绑定到一个队列,同一个消息只会被同一个消费者处理
        ●  通过设置prefetch来控制消费者预取的消息数量

在这里插入图片描述
与简单队列模型相比,多了一个消息消费者(可提高消息处理速度,避免队列消息堆积)

案例:模拟WorkQueue,实现一个队列绑定多个消费者
1、 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

@RunWith(SpringRunner.class)@SpringBootTestpublicclassSpringAmqpTest{@Autowiredprivate RabbitTemplate rabbitTemplate;@TestpublicvoidtestSendMessage2WorkQueue()throws InterruptedException {
        String queueName ="simple.queue";
        String message ="hello, message__";for(int i =1; i <=50; i++){
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);//使消息发送慢些}}

2、 在consumer服务中定义两个消息监听者,都监听simple.queue队列

3、 消费者1每秒处理50条消息,消费者2每秒处理10条消息

@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")publicvoidlistenWorkQueue1(String msg)throws InterruptedException {
        System.out.println("消费者1接收到消息:【"+ msg +"】"+ LocalTime.now());
        Thread.sleep(20);}@RabbitListener(queues ="simple.queue")publicvoidlistenWorkQueue2(String msg)throws InterruptedException {
        System.err.println("消费者2........接收到消息:【"+ msg +"】"+ LocalTime.now());
        Thread.sleep(200);}}

发送消息,查看消息接收日志
在这里插入图片描述
实际上消息平均分配给了消费者,消费者1(偶数消息)与消费者2(奇数消息)个消费25条消息,这种分配方式没有考虑两个消费者的能力,这是由于RabbitTemplate内部的消息预取机制造成的。

消息预取机制
修改application.yml文件,设置prefetch这个值,可以控制预取消息的上限

spring:rabbitmq:host: 192.168.197.140 # rabbitMQ的ip地址port:5672 # 端口username: GY
    password:123456virtual-host: /      # 虚拟主机listener:simple:prefetch:1  # 每次只能获取一条消息,处理完才能获取下一个消息

重新发送消息,查看日志
在这里插入图片描述
此时便可起到能者多劳的效果!!!————>这种模式可以提高整个队列的消息处理速度,避免消息的堆积

发布、订阅模型

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

常见exchange类型包括:
●   Fanout:广播
●   Direct:路由
●   Topic:话题

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

发布、订阅模型——FanoutExchange

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue
在这里插入图片描述

SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:
在这里插入图片描述

案例:利用SpringAMQP演示FanoutExchange的使用

1、在consumer服务中,声明 Queue队列、 Exchange 交换机和 Binding 绑定关系对象

@ConfigurationpublicclassFanoutConfig{// itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){returnnewFanoutExchange("GY.fanout");}// fanout.queue1@Beanpublic Queue fanoutQueue1(){returnnewQueue("fanout.queue1");}// 绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder
                .bind(fanoutQueue1).to(fanoutExchange);}// fanout.queue2@Beanpublic Queue fanoutQueue2(){returnnewQueue("fanout.queue2");}// 绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder
                .bind(fanoutQueue2).to(fanoutExchange);}}

启动consumer服务,进入浏览器查看,队列成功与交换机绑定关系
在这里插入图片描述

2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){
        System.out.println("消费者接收到fanout.queue1的消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){
        System.out.println("消费者接收到fanout.queue2的消息:【"+ msg +"】");}}

3、在publisher中编写测试方法,向itcast.fanout发送消息

@TestpublicvoidtestSendFanoutExchange(){// 交换机名称
        String exchangeName ="GY.fanout";// 消息
        String message ="hello, every one!";// 发送消息,参数分别是:交换机名称、RoutingKey(暂时为空)、消息
        rabbitTemplate.convertAndSend(exchangeName,"", message);}

在这里插入图片描述
两个队列收到相同消息(实现了一次发送,多个消费者均能接收消息)

交换机的作用是什么?
      ●    接收publisher发送的消息
      ●    将消息按照规则路由到与之绑定的队列
      ●    不能缓存消息,路由失败,消息将丢失
      ●    FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
      ●    Queue
      ●    FanoutExchange
      ●    Binding

发布、订阅模型——DirectExchange

Direct Exchange 会将接收到的消息根据规则(Key)路由到指定的Queue,因此称为路由模式(routes)
● 每一个Queue都与Exchange设置一个BindingKey
● 发布者发送消息时,指定消息的RoutingKey
● Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在这里插入图片描述

也可绑定相同的Key,一个队列在与交换机绑定时,可指定多个Key
在这里插入图片描述

案例:利用SpringAMQP演示DirectExchange的使用
1、利用@RabbitListener声明Exchange、Queue、RoutingKey

2、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="direct.queue1"),
            exchange =@Exchange(name ="GY.direct", type = ExchangeTypes.DIRECT),
            key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="direct.queue2"),
            exchange =@Exchange(name ="GY.direct", type = ExchangeTypes.DIRECT),
            key ={"yellow"}))publicvoidlistenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【"+ msg +"】");}

3、在publisher中编写测试方法,向GY.direct发送消息

@TestpublicvoidtestSendDirectExchange(){// 交换机名称
    String exchangeName ="GY.direct";// 消息
    String message ="hello, red!";// 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"red", message);}
Direct交换机与Fanout交换机的差异?
        ●      Fanout交换机将消息路由给每一个与之绑定的队列
        ●      Direct交换机根据RoutingKey判断路由给哪个队列
        ●      如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
        ●        @QueueBinding
        ●      @Queue
        ●      @Exchange

发布-订阅模型——TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。

Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
在这里插入图片描述

案例:利用SpringAMQP演示TopicExchange的使用

1、并利用@RabbitListener声明Exchange、Queue、RoutingKey

2、在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="topic.queue1"),
            exchange =@Exchange(name ="GY.topic", type = ExchangeTypes.TOPIC),
            key ="china.#"))publicvoidlistenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="topic.queue2"),
            exchange =@Exchange(name ="GY.topic", type = ExchangeTypes.TOPIC),
            key ="#.weather"))publicvoidlistenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【"+ msg +"】");}

3、在publisher中编写测试方法,向itcast. topic发送消息

@TestpublicvoidtestSendTopicExchange(){// 交换机名称
        String exchangeName ="GY.topic";// 消息
        String message ="iPhone 15 pro或取消物理按键!";// 发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}

消息转换器

案例:测试发送Object类型消息

在这里插入图片描述

通过Ctrl+P查看参数列表,在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

1、在consumer中利用@Bean声明一个队列

@Beanpublic Queue objectQueue(){returnnewQueue("object.queue");}

2、在publisher中发送消息以测试

@TestpublicvoidtestSendObjectQueue(){
        Map<String,Object> msg=newHashMap<>();
        msg.put("name","GY");
        msg.put("project","Java");
        rabbitTemplate.convertAndSend("object.queue",msg);}

进入浏览器查看消息却发现消息为字节且较长(RabbitMQ只支持字节)

消息转换器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下
1、在publisher服务引入依赖

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

2、我们在consumer服务定义MessageConverter

@SpringBootApplicationpublicclassPublisherApplication{publicstaticvoidmain(String[] args){
        SpringApplication.run(PublisherApplication.class);}@Beanpublic MessageConverter messageConverter(){returnnewJackson2JsonMessageConverter();}}

3、定义一个消费者,监听object.queue队列并消费消息

@RabbitListener(queues ="object.queue")publicvoidlistenObjectQueue(Map<String,Object> msg){
        System.out.println("接收到object.queue的消息:"+ msg);}

发送消息进行测试,接收消息成功!!!
在这里插入图片描述

控制台成功打印
在这里插入图片描述
成功接收消息!!!

SpringAMQP中消息的序列化和反序列化是怎么实现的?
        ●        利用MessageConverter实现,默认是JDK的序列化
        ●       注意发送方与接收方必须使用相同的MessageConverter
标签: 中间件 rabbitmq java

本文转载自: https://blog.csdn.net/m0_56188609/article/details/127576258
版权归原作者 new一个对象_ 所有, 如有侵权,请联系我们删除。

“服务异步通讯——RabbitMQ”的评论:

还没有评论