0


RabbitMQ(黑马spring cloud笔记)

MQ

目录

一、同步通讯和异步通讯

1. 同步通讯

优点

  • 时效性强,立即获取结果

缺点

  • 耦合度高
  • 性能和吞吐能力不如异步
  • 额外资源消耗
  • 级联失败问题

2. 异步通讯

在这里插入图片描述

优点

  • 服务解耦
  • 性能提升,吞吐量提高
  • 服务没有强依赖,不担心级联问题
  • 流量削峰

缺点

  • 依赖Broker的可靠性、安全性、吞吐能力
  • 架构复杂的情况下,业务没有明显的流程线,不好追踪管理

MQ即是事件驱动架构中的Broker。

二、RabbitMQ

1. 部署

直接docker拉一个:

# 拉取镜像docker pull rabbitmq:3-management
#启动容器docker run \-eRABBITMQ_DEFAULT_USER=root \-eRABBITMQ_DEFAULT_PASS=123456\--name mq \--hostname mq1 \-p15672:15672 \-p5672:5672 \-d\
 rabbitmq:3-management
 # 15672是管理口

2. 架构

在这里插入图片描述

几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,对queue、exchange等资源的逻辑分组

3. 常见消息模型

3.1 基本消息队列(Basic Queue)

在这里插入图片描述

  1. 依赖<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>amqp是高级消息队列协议,springAMQP则是一种实现。
  2. 配置spring:rabbitmq:host: 190.92.246.107 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: root password:123456
  3. 实现- 发布者publicclassPublisherTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){String queueName ="simple.queue";String message ="hello, spring amqp"; rabbitTemplate.convertAndSend(queueName, message);}}- 消费者配置都是一样的@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ={"simple.queue"})publicvoidlistenSimpleQueue(String msg){System.out.println(msg);}}启动main函数,成功:在这里插入图片描述

3.2 工作消息队列(Work Queue)

在这里插入图片描述

两个消费者合作处理消息,避免消息堆积。

AMQP有一个消息预取机制,预取多少条消息是可以配置的。

spring:rabbitmq:host: 190.92.246.107 # 主机名port:5672# 端口virtual-host: / # 虚拟主机username: root
    password:123456listener:simple:prefetch:1
  • 发布者:@TestpublicvoidtestSimpleQueue()throwsInterruptedException{String queueName ="simple.queue";String message ="hello, spring amqp";for(int i =0; i <50; i++){ rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
  • 消费者@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ={"simple.queue"})publicvoidlistenSimpleQueue1(String msg)throwsInterruptedException{System.out.println("消费者1"+"【"+ msg +"】"+LocalTime.now());Thread.sleep(20);}@RabbitListener(queues ={"simple.queue"})publicvoidlistenSimpleQueue2(String msg)throwsInterruptedException{System.err.println("消费者2"+"【"+ msg +"】"+LocalTime.now());Thread.sleep(200);}}

如果消息预取机制不设置,意味着不设限,那么在这个例子中每个消费者无论处理能力如何,都会处理25条消息,设置为1后,则按照能力分配。

3.3 发布订阅(Publish、Subscribe)

和之前不同的是,可以将一条消息发送给多个消费者,实现方式是加入了交换机。

根据交换机类型不同分为三种:广播、路由和主题

  • Fanout Exchange 广播这个交换机会将消息路由到每一个和它绑定的队列在这里插入图片描述- 发布者不同的是,我们发送消息到交换机@TestpublicvoidtestSendFanoutExchange(){String exchangeName ="root.fanout";String message ="hello everyone"; rabbitTemplate.convertAndSend(exchangeName,"", message);}- 订阅者首先创建交换机和队列,并将队列绑定到交换机上(有注解的写法,像后文路由模式那样)@ConfigurationpublicclassFanoutConfig{@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("root.fanout");}@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}@BeanpublicBindingbindQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@BeanpublicBindingbindQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}然后监听队列:@RabbitListener(queues ={"fanout.queue1"})publicvoidlistenFanoutQueue1(String msg)throwsInterruptedException{System.out.println("fanout.queue1消费者"+"【"+ msg +"】"+LocalTime.now());}@RabbitListener(queues ={"fanout.queue2"})publicvoidlistenFanoutQueue2(String msg)throwsInterruptedException{System.err.println("fanout.queue2消费者"+"【"+ msg +"】"+LocalTime.now());}启动测试:在这里插入图片描述
  • Direct Exchange 路由在这里插入图片描述特点:- 每个Queue都与Exchange设置一个BindingKey- 发布者发送消息时,指定消息的RoutingKey- Exchange将消息路由到BindingKey与消息Routingkey一致的队列接下来就可以测试一下:有一个交换机,两个队列,两个消费者分别有两个BindingKey。@RabbitListener(bindings =@QueueBinding( value =@Queue(name ="direct.queue1"), exchange =@Exchange(name ="root.direct", type =ExchangeTypes.DIRECT), key ={"blue","red"}))publicvoidlistenDirectQueue1(String msg){System.err.println("direct.queue1消费者"+"【"+ msg +"】"+LocalTime.now());}@RabbitListener(bindings =@QueueBinding( value =@Queue(name ="direct.queue2"), exchange =@Exchange(name ="root.direct", type =ExchangeTypes.DIRECT), key ={"yellow","red"}))publicvoidlistenDirectQueue2(String msg){System.err.println("direct.queue2消费者"+"【"+ msg +"】"+LocalTime.now());}发布者:@TestpublicvoidtestSendDirectExchange(){String exchangeName ="root.direct";String message ="hello red"; rabbitTemplate.convertAndSend(exchangeName,"red", message);}不断更换routingKey,观察订阅者日志。

  • Topic Exchange 主题在这里插入图片描述和路由模式类似,区别是这个模式的key是多个单词的列表,以 “ . ” 分割。在指定BIndingKey时可以使用通配符。例如:#代表0个或多个单词,*代表一个单词。- 订阅@RabbitListener(bindings =@QueueBinding( value =@Queue(name ="topic.queue1"), exchange =@Exchange(name ="root.topic", type =ExchangeTypes.TOPIC), key ={"china.#"}))publicvoidlistenTopicQueue1(String msg){System.err.println("topic.queue1消费者"+"【"+ msg +"】"+LocalTime.now());}@RabbitListener(bindings =@QueueBinding( value =@Queue(name ="topic.queue2"), exchange =@Exchange(name ="root.topic", type =ExchangeTypes.TOPIC), key ={"#.news"}))publicvoidlistenTopicQueue2(String msg){System.err.println("topic.queue2消费者"+"【"+ msg +"】"+LocalTime.now());}- 发布@TestpublicvoidtestSendTopicExchange(){String exchangeName ="root.topic";String message ="hello world"; rabbitTemplate.convertAndSend(exchangeName,"china.news", message);}改变routingkey,观察日志。

4. 消息转换器

我们不仅仅可以发送字符串消息,还可以发送对象,默认情况下,需要传统的序列化方式,对象需要实现Serializable接口,不太方便,我们使用json。

  1. 引入依赖<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
  2. 自定义MessageConverter@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}这个时候发送的消息就会经过json序列化了。
  3. 测试创建队列@BeanpublicQueuefanoutExchange(){returnnewQueue("object.queue");}消费者(需要像发布者一样的,引入jackson,然后定义messageConverter)@RabbitListener(queues ="object.queue")publicvoidlistenObjectQueue(Map<String,Object> msg){System.err.println("object.queue消费者"+"【"+ msg.get("name")+"】"+LocalTime.now());System.err.println("object.queue消费者"+"【"+ msg.get("date")+"】"+LocalTime.now());}发布消息@TestpublicvoidtestSendObj(){String queue ="object.queue";Map<String,Object> msg =newHashMap<>(); msg.put("name","root"); msg.put("date",newDate()); rabbitTemplate.convertAndSend(queue, msg);}成功:在这里插入图片描述

本文转载自: https://blog.csdn.net/yangsf_/article/details/128997592
版权归原作者 yangsf_ 所有, 如有侵权,请联系我们删除。

“RabbitMQ(黑马spring cloud笔记)”的评论:

还没有评论