0


SpringCloud学习路线(9)——服务异步通讯RabbitMQ

一、初见MQ

(一)什么是MQ?

MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。

(二)同步调用

1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。

2、存在的问题

  • 耦合度高: 新需求需要改动原代码
  • 性能下降: 调用者需要等待服务提供者相应,如果调用链过长则响应时间等于每次调用的时间之和。
  • 资源浪费: 调用链的每个服务在等待响应过程中,不会释放请求资源,高并发场景下会浪费系统资源。
  • 级联失败: 若服务提供者出现宕机,所有调用者都会因故障而导致整个服务集群故障。

(三)异步调用

1、实现模式: 异步调用常见实现的就是事件驱动模式。

2、事件驱动的优势

  • 服务解耦: 只需要将请求交付给事件管理器进行管理即可完成服务。
  • 性能提升: 与客户交互的服务短时间就能完成,并不需要等待后续服务完成。
  • 服务弱依赖: 其它服务宕机不影响服务集群的使用
  • 流量缓冲: 事件管理器通过任务队列的方式,使得订阅的服务按照自身速度进行执行。

3、事件驱动的缺点

  • 高度依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂时,业务没有明显的流程线,不便于跟踪管理

(四)MQ常见框架
RabbitMQ(中小企业)ActiveMQRocketMQ(大型企业)Kafka公司/社区RabbitApacheAlibabaApache开发语言ErlangJavaJavaJava协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高极高消息延迟微妙级毫秒级毫秒级毫秒以内消息可靠高一般高一般
二、使用MQ

(一)RabbitMQ概述

RqbbitMQ是基于Erlang语言开发的开源消息通讯中间件,官方地址:https://rabbitmq.com/

(二)安装MQ

docker pull rabbitmq:3-management

在这里插入图片描述

(三)运行RabbitMQ

#配置 MQ的用户名和密码,容器名和主机名,端口,镜像名 ,注意:15672端口是MQ的控制台访问端口,5672是对外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=xxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

在这里插入图片描述

访问MQ的控制台

在这里插入图片描述
(4)RabbitMQ的整体结构

在这里插入图片描述

(5)RabbitMQ中的几个概念

  • channel: 操作MQ的工具
  • exchange: 路由消息到队列
  • queue: 缓存消息
  • Virtual Host: 虚拟主机,是对queue,exchange等资源进行逻辑分组

(6)常见的MQ模型

  • 基本消息队列(BasicQueue): Publisher —1:1— Queue —1:1— Customer
  • 工作消息队列(WorkQueue): Publisher —1:1— Queue —1:n— Customer
  • 发布/订阅(Publish、Subscribe): 根据交换机类型又有三种模型 - Fanout Exchange: 广播,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer- Direct Exchange: 路由,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer- Topic Exchange: 主题,
  • RPC
  • 发布者确认

第一种:基本消息队列的基本使用

包含三种角色:publisherqueueconsumer

  • publisher: 消费发布者,将消息发布到队列queue
  • queue: 消息队列,负责接受并缓存消息
  • consumer: 订阅队列,处理队列中的消息

收发消息的过程: 获取连接 》 建立通信通道 》 创建消息队列 》 收发消息 》 释放资源

1、publisher和consumer引入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId></dependency>

2、Publisher创建发送消息通道

@SpringBootTestclassPublisherApplicationTests{@TestvoidtestSendMessage()throwsIOException,TimeoutException{//        1、建立连接ConnectionFactory connectionFactory =newConnectionFactory();//        2、设置连接参数
        connectionFactory.setHost("192.168.92.131");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");//        3、建立连接Connection connection = connectionFactory.newConnection();//        4、建立通信通道ChannelChannel channel = connection.createChannel();//        5、创建队列String queueName ="simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);//        6、发送信息String message ="hello,rabbitmq!";
        channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("发送消息成功:【"+message+"】");//        7、关闭通道和连接
        channel.close();
        connection.close();}}

2、Consumer创建订阅通道

classConsumerApplicationTests{publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{//        1、建立连接ConnectionFactory connectionFactory =newConnectionFactory();//        2、设置连接参数
        connectionFactory.setHost("192.168.92.131");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("root");//        3、建立连接Connection connection = connectionFactory.newConnection();//        4、建立通信通道ChannelChannel channel = connection.createChannel();//        5、创建队列String queueName ="simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);//        6、订阅消息
        channel.basicConsume(queueName,true,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{//                7、处理消息String message =newString(body);System.out.println("接收到消息:【"+message+"】");}});System.out.println("等待接收消息....");}}

第二种:Work Queue 工作队列

与基本队列的区别在于,它能使用多个订阅队列进行高效的处理请求。(因为一个订阅队列的处理速度是有限的)

使用过程与基本队列几乎一致,只是开启了多个订阅队列。

在使用过程中我们会发现,多个订阅队列对任务的分配是平均的,这就是预取机制

我们需要的是快速处理的订阅队列获取更多的请求,慢速处理的订阅队列获取少量的请求,它如何实现呢?

通过修改配置文件,设置一个 preFetch 值。

spring:
  rabbitmq:
    host: 192.168.92.131 #IP
    port: 5672 #端口
    virtual-host: / #虚拟主机
    username: root #用户名
    password: root #密码
    listener:
      simple:
        prefetch: 1 # 每次取 1 个请求,处理完才能取下一个。

第三种:FanoutQueue 广播消息队列

SpringAMQP提供声明交换机、队列、绑定关系的API

主要使用的是Exchange.FanoutExchange类。

实现思路:
1、在consumer服务,声明队列,交换机,并将两者绑定。

@ConfigurationpublicclassFanoutConfig{//交换机@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("com.fanout");}//队列@BeanpublicQueuefanoutQueue1(){returnnewQueue("com.queue1");}//绑定关系@BeanpublicBindingbindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue).to(fanoutExchange);}//...以相同方式声明第2个队列,并完成绑定}

2、在consumer服务,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="com.queue1")publicvoidlistenFanoutQueue1(String msg)throwsInterruptedException{//...处理结果}@RabbitListener(queues ="com.queue2")publicvoidlistenFanoutQueue2(String msg)throwsInterruptedException{//...处理结果}}

3、在publisher编写测试方法,向交换机发送信息

@TestpublicvoidsendFanoutExchange(){//1、交换机String exchangeName ="com.fanout";//2、消息String message ="Hello Fanout";//3、发送消息
    rabbitTemplate.covertAndSend(exchangeName,"", message);}

第四种:路由信息队列

路由模式的流程: 即设置密钥的绑定关系,只有携带相应的密钥才能进入相应的队列

  • 每一个 QueueExchange 设置一个 BindingKey
  • 发布者发送消息时,需要指定消息的 RoutingKey
  • Exchange根据消息路由到 BindingKeyRoutingKey 一致的队列

实现思路:
1、利用 @RabbitListener 声明Exchange、Queue、RoutingKey

@RabbitListener(bindings =@QueueBinding(value =@Queue(name ="direct.queue1"), exchange =@Exchange(name ="com.exchange", type =ExcahngeTypes.DIRECT), key ={"red","blue"}))publicvoidlistenRoutingQueue1(String msg)throwsInterruptedException{//...处理结果}@RabbitListener(bindings =@QueueBinding(value =@Queue(name ="direct.queue2"), exchange =@Exchange(name ="com.exchange", type =ExcahngeTypes.DIRECT), key ={"red","green"}))publicvoidlistenRoutingQueue2(String msg)throwsInterruptedException{//...处理结果

2、发送消息实现

//指定队列处理@TestpublicvoidsendRoutingExchange1(){//交换机,消息String exchangeName ="com.exchange";String message ="Hello,RoutingMQ";//发送消息
    rabbitTemplate.covertAndSend(exchangeName,"blue", message);}//多队列处理@TestpublicvoidsendRoutingExchange2(){//交换机,消息String exchangeName ="com.exchange";String message ="Hello,RoutingMQ";//发送消息
    rabbitTemplate.covertAndSend(exchangeName,"red", message);}

第五种:主题信息队列(通配key)

TopicExchange 与 DirectExchange 的区别: routingkey必须是多个单词的列表,并且以

,

分割。并且Queue与Exchange指定的BindingKey时可使用通配符:

  • #:代指 0 / n 个单词
  • * 代指一个单词

实现思路:
1、通过 @RabbitListener 声明Exchange、Queue、RoutingKey

@RabbitListener(bingdings =@QueueBinding(exchange =@Exchange(name ="com.exchange", type =ExchangeTypes.TOPIC), queue =@Queue(name ="com.queue1"), key ={"china.#"}))publicvoidlistenTopicQueue1(String msg){//处理代码....}@RabbitListener(bingdings =@QueueBinding(exchange =@Exchange(name ="com.exchange", type =ExchangeTypes.TOPIC), queue =@Queue(name ="com.queue2"), key ={"#.news"}))publicvoidlistenTopicQueue2(String msg){//处理代码....}

2、在publisher服务中,向交换机发送消息

@TestpublicvoidsendTopicMessage(){//交换机,消息String exchangeName ="com.exchange";String message ="Hello,Topic";
    rabbitTemplate.convertAndSend(exchangeName,"china.call",message);}

四、SpringAMQP

(一)概念

  • AMQP: Advanced Message Queuing Protocol 传递消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言及平台无关,更符合为服务中独立性的要求。
  • Spring AMQP: Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。其中 spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

(二)实现基础消息队列

1、引入spring-amqp依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、publisher服务中利用RabbitTemplate发送消息到任务队列

  • 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport:5672virtual-host: /
    username: root
    password: root
  • 编写发送方法
@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidsendMessage(){String queueName ="simple.queue";String message ="Hello World";
        rabbitTemplate.convertAndSend(queueName,message);}

3、在consumer服务中编写消费逻辑,绑定simple.queue队列

  • 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport:5672virtual-host: /
    username: root
    password: root
  • 编写发送方法1
@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidgetMessage(){String queueName ="simple.queue";// receive 表示接收方法,接收到的信息会封装到Message,可以看receive的返回值Message message = rabbitTemplate.receive(queueName);// Message.getBody 是 byte[]System.out.println(newString(message.getBody()));}
  • 编写发送方法2- 创建一个监听类
// 注册成 Bean 对象@ComponentpublicclassSpringRabbitListener{// 监听器注释,queues = 订阅队列,并将返回值注入参数列表中    @RabbitListener(queues ="simple.queue")publicvoidListenSimpleQueueMessage(String msg){System.out.println("Spring 消费者接收到消息:【"+ msg +"】");}}

(三)消息转换器

为了让我们能够自由识别consumer发送的消息,则需要使用的是消息转换器

消息转换器如何使用?

Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理,默认实现的是SimpleMessageConverter,基于ObjectObjectOutputStream完成序列化。

我们只需要定义一个 MessageConverter 类型的Bean即可,推荐使用JSON序列化

1、publisher引入依赖

<!-- 接收消息需要使用jackson的转换依赖 --><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency><!-- 发送消息需要使用jackson的核心依赖 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

2、publisher启动类,声明MessageConverter

@BeanpublicMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}

3、consumer启动类,声明MessageConverter

@BeanpublicMessageConverterjsonMessageConverter(){returnnewJackson2JsonMessageConverter();}

4、监听队列消息

@RabbitListener(queues ="object.queue")publicvoidlistenObjectMessage(Object msg){//处理数据....}

本文转载自: https://blog.csdn.net/Zain_horse/article/details/131842138
版权归原作者 Zain_horse 所有, 如有侵权,请联系我们删除。

“SpringCloud学习路线(9)——服务异步通讯RabbitMQ”的评论:

还没有评论