目录
2- RabbitMQ
2-1 介绍和安装
RabbitMQ 的整体架构以及核心概念
publisher
:消息发送者cunsumer
:消息消费者queue
:队列exchange
:交换机,负责路由消息virtual-host
:虚拟主机,起到数据隔离的作用;一个 MQ 中可以创建多个virtual-host
数据流转的模型:①生产者将数据发送给交换机 ——> ②交换机将消息路由给队列 ——> ③消费者监听队列拿到消息
安装
- 上传镜像文件
mq.tar
到 Linux 系统中 - 执行命令
docker load -i mq.tar
- 复制以下代码执行
- 其中 15672 ,是控制台端口
- 其中 5672 ,是收发消息的端口
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management
2-2 RabbitMQ 快速入门
需求
- 在
rabbitmq
的控制台完成下列操作: - 新建队列
hello.queue1
和hello.queue2
- 向默认的
amp.fanout
交换机发送一条消息 - 查看消息是否到达
hello.queue1
和hello.queue2
实现
- 需要创建队列,同时在交换机中需要绑定队列才能实现消息的路由。
总结规律
①如果交换机和队列没有绑定能否收到消息?
- 交换机:是负责路由和转发消息的。交换机通过绑定队列,之后可以将消息转发到队列中。
②如果绑定了所有队列是不是所有队列都可以收到消息?
- 是的,如果一个交换机绑定了多个队列,那类似于广播的效果所有队列都能收到消息。
2-3 RabbitMQ 数据隔离
- 在 RabbitMQ 中有虚拟主机的概念,对于交换机和队列而言都有二者自己的虚拟主机。
需求
- 在 RabbitMQ 的控制台下完成下列操作 - 新建一个用户 hmall- 为 hmall 用户创建一个 vitual host- 测试不同的 vitual host 之间的数据隔离现象
实现
- ① 首先在
Admin
——>Users
中创建用户 - ② 在
Admin
——>Users
中创建虚拟主机
总结
- 各个虚拟主机下都是相互隔离的。
3- Java客户端
3-1 快速入门
AMQP
什么是 AMQP?
- Advanced Message Queuing Protocol,是一种高级的消息队列协议,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP
Spring AMQP
是基于 AMQP 协议定义的一套 API 规范,提供了模板来发送和接收消息。包含两部分,其中Spring AMQP
是基础抽象,spring-rabbit
是底层的默认实现
快速入门
需求
- 利用控制台创建队列 simple.queue
- 在 publisher 服务中,利用 SpringAMQP 直接向 simple.queue 发送消息
- 在 consumer 服务中,利用 SpringAMQP 编写消费者监听 simple.queue 队列
实现
- ① 引入
spring-amqp
依赖,在父工程中引入spring-amqp
依赖,这样publisher
和consumer
服务都可以使用
<!-- AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- ② 在每个微服务中引入 MQ 服务端信息,这样微服务才能连接到 RabbitMQ
- ③ 发送消息: SpringAMQP 提供了 RabbitTemplate 工具类,方便我们发送消息,发送消息的代码如下
@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message ="hello, spring amqp!";// 发送消息
rabbitTemplate.convertAndSend(queueName, message);}
- ④ 接收消息:
SpringAMQP
提供声明式的消息监听,我们只需要通过 注解 在方法上声明要监听的队列名称,将来SpringAMQP
就会把消息传递给当前方法。
@Slf4j@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{
log.info("spring 消息接收到消息: ["+ msg +"]");if(true){thrownewMessageConversionException("故意的");}
log.info("消息处理完毕");}}
📑小结:SpringAMQP如何收发消息?
3-2 WorkQueues 任务模型
- Work queues,任务模型。简单来说就是 让多个消费者绑定到一个队列,共同消费队列中的消息。
案例-使用 WorkQueue 单队列绑定多消费者
模拟 WorkQueue 实现一个队列绑定多个消费者,基本思路如下
- ① 在 RabbitMQ 的控制台创建一个队列,名为
work.queue
- ② 在 publisher 服务中定义测试方法,在 1 秒内产生 50 条消息,发送到
work.queue
- ③ 在 consumer 服务中顶级两个消息监听者,都监听
work.queue
队列 - ④ 消费者每 1 秒处理 50 条消息,消费者每 2 秒处理 5 条消息
实现
- ① 实现消费者
@Slf4j@ComponentpublicclassMqListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String msg){System.out.println("消费者收到了simple.queue的消息: ["+ msg +"]");}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(String msg){System.out.println("消费者1 收到了 work.queue的消息: ["+ msg +"]");}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(String msg){System.err.println("消费者2 收到了 work.queue的消息: ["+ msg +"]");}}
- ② 实现生产者
@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestSendMessage2Queue(){String queueName ="simple.queue";String msg ="hello, amqp!";
rabbitTemplate.convertAndSend(queueName, msg);}@TestvoidtestWorkQueue()throwsInterruptedException{String queueName ="work.queue";for(int i =1; i <=50; i++){String msg ="hello, worker, message_"+ i;
rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}}
结果
- 发现在消费的过程中,两个消费者并没有都消费 50 条消息。
- 二者消费的过程是采用轮询的方式进行消费。
通过改变消费速度
- 即便改变了消费的速度,消费的过程中消费者1 和消费者2,也是按照轮询的方式消费任务。
@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(String msg){System.out.println("消费者1 收到了 work.queue的消息: ["+ msg +"]");}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(String msg){System.err.println("消费者2 收到了 work.queue的消息: ["+ msg +"]");Thread.sleep(20);}
📑小结:Work模型的使用
3-3 Fanout 交换机
Fanout Exchange 交换机会将收到的消息广播到每一个跟其绑定的 queue,所以也叫 广播模式
案例- FanouotExchange 使用
实现思路如下
- ① 在 RabbitMQ 控制台中,声明队列
fanout.queue1
和fanout.queue2
- ② 在 RabbitMQ 控制台中,声明交换机
hmall.fanout
将两个队列与其绑定 - ③ 在 consumer 服务中,编写两个消费者方法,分别监听
fanout.queue1
和**fanout.queue2
- ④ 在 publisher 中编写测试方法,向
hmall.fanout
发送消息。
Fanout 交换机
- 消费者
@RabbitListener(queues ="fanout.queue1")publicvoidlistenWorkQueue1(String msg){System.out.println("消费者1 收到了 fanout.queue1的消息: ["+ msg +"]");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenWorkQueue2(String msg){System.err.println("消费者2 收到了 fanout.queue2的消息: ["+ msg +"]");}
- 生产者
@TestvoidtestSendFanout(){String exchangeName ="hmall.fanout";String msg ="hello,everyone";
rebbitTemplate.converAndSend(exchangeName,numm,msg);}
- 运行结果
📑小结:交换机的作用是什么?
3-4 Direct 交换机
Direct Exchange 会将接收到的消息根据路由规则路由到指定的 Queue,因此可以称为定向路由。
- 每一个 Queue 都与 Exchange 设置一个 Bindingkey
- 发布者发送消息时,制定消息的 RoutingKey
- Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列
案例-利用 SpringAMQP 演示 DirectExchange 的使用
需求如下
- ① 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
- ② 在 RabbitMQ 控制台中,声明交换机 hmall.direct,将两个队列与其绑定
- ③ 在 consumer 服务中,编写两个消费方法,分别监听 direct.queue1 和 direct.queue2
- ④ 在 publisher 中编写测试方法,利用不同的 RoutingKey 向 hmall.direct 发送消息
📑小结:DirectExchange 的作用是什么?
:::info
- DirectExchange 交换机可以通过 bindingKey 来设置,将消息通过 bindingKey 发送到指定的队列中。通过设置合适的绑定键,您可以确保特定的消息被发送到特定的微服务进行处理。这样可以实现消息的精确路由,确保消息只被需要的消费者接收和处理。
:::
3-5 Topic 交换机
TopicExchange 与 DirectExchange 类似,区别在于 routingKey 可以是多个单词的列表,并且以
.
分割。
- Queue 与 Exchange 指定 BindingKey 时可以使用通配符 -
#
代表 0 个 或多个单词。-*
:代指一个单词
- 类似上述实现,如果一个 bindingKey 定义为了
china.#
那么,对于其而言只会接收与china.#
开头的消息。
案例-利用 SpringAMQP 演示 DirectExchange 的适用
需求如下
- ① 在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
- ② 在 RabbitMQ 控制台中,声明交换机 hmall.topic 将两个队列与其绑定
- ③ 在 consumer 服务中,编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
- ④ 在 publisher 中编写测试方法,利用不同的 RoutingKey 向 hmall.topic 发送消息
- 消费者
@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(String msg)throwsInterruptedException{System.out.println("消费者1 收到了 topic.queue1的消息: ["+ msg +"] ");}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(String msg)throwsInterruptedException{System.out.println("消费者2 收到了 topic.queue2的消息: ["+ msg +"] ");}
- 发送者 - 以下情况消息会被路由到
**#.news**
的队列中。
@TestvoidtestSendTopic(){String exchangeName ="hmall.topic";String msg ="蓝色通知,警报解除,哥斯拉是放的气球";
rabbitTemplate.convertAndSend(exchangeName,"japan.news", msg);}
- 发送者2 - 以下情况两个队列都会收到消息。
@TestvoidtestSendTopic(){String exchangeName ="hmall.topic";String msg ="蓝色通知,警报解除,哥斯拉是放的气球";
rabbitTemplate.convertAndSend(exchangeName,"china.news", msg);}
- 发送者3 - 以下情况只有队列1 会受到消息。
@TestvoidtestSendTopic(){String exchangeName ="hmall.topic";String msg ="蓝色通知,警报解除,哥斯拉是放的气球";
rabbitTemplate.convertAndSend(exchangeName,"china.weather", msg);}
📑小结:描述下 Direct 交换机和 Topic 交换机的差异
3-6 声明队列和交换机
- 使用 Java 代码声明队列交换机才是最靠谱的方式
声明队列和交换机的方式(Java代码实现)
SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系
Queue
:用于声明队列,可用工厂类QueueBuilder
创建Exchange
:用于声明交换机,可以用工厂类ExchangeBuilder
构建Binding
:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder
构建
代码实现
- 例如,声明一个 Fanout 类型的交换机,并且创建队列与其绑定
publicclassFanoutConfiguration{@BeanpublicFanoutExchangefanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();returnnewFanoutExchange("hmall.fanout2");}@BeanpublicQueuefanoutQueue3(){// QueueBuilder.durable("ff").build();returnnewQueue("fanout.queue3");}@BeanpublicBindingfanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}@BeanpublicQueuefanoutQueue4(){// QueueBuilder.durable("ff").build();returnnewQueue("fanout.queue4");}@BeanpublicBindingfanoutBinding4(){returnBindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}}
使用注解的方式解决 Direct 交换机问题
- 由于 Direct 交换机可以配置对应的 key,但对于声明式方式来说,需要对每个 key 都写一个 binding 方法,这样效率很低,所以引入注解的方式实现
- SpringAMQP 还提供了基于 @RabbitListener 注解来声明队列和交换机的方式,在 Listener 的部分通过注解实现
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue1",durable ="true"),
exchange =@Exchange(name ="hmall,direct",type =ExchangeTypes,DIRECT),
key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg)throwsInterruptedException{System.out.println("消费者 1 收到了 direct.queueq 的消息:【"+msg+"】");}
📑小结
3-7 消息转换器
案例-利用 SpringAMQP 发送对象类型的消息
- ① 声明一个队列,名为 object.queue
- ② 编写单元测试,向队列中直接发送一条消息,消息类型为 Map
- ③ 在控制台查看消息,总结你能发现的问题
// 准备消息Map<String,Object> msg =newHashMap<>();
msg.put("name","Jack");
msg.put("age",21);
@TestvoidtestSendObject(){Map<String,Object> msg =newHashMap<>(2);
msg.put("name","jack");
msg.put("age",21);
rabbitTemplate.convertAndSend("object.queue", msg);}
问题
- Spring 对消息对象的处理是由 org.springframework,amqp.support.converter.MessageConveerter 来处理的。而默认实现是 SimpleMessageConverter,基于 JDK 的 ObjectOutputStream 完成序列化。
存在以下问题
- JDK 的序列化有安全风险
- JDK 序列化的消息太大
- JDK 序列化的消息可读性差
建议采用 JSON 薛丽华代替默认的的 JDK 序列化,要做两件事情:
- 在 publisher 和 consumer 中都要引入 Jackson 依赖:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
- 在 publisher 和 consumer 中都要配置 MessageConverter
@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}
4- 使用 MQ 改造支付业务代码
案例
- 需求:改造余额支付功能,不再同步调用交易服务的 OpenFeign 接口,而是采用异步的 MQ 通知交易服务更改订单状态
- 业务中引入 AMQP 依赖
<!-- amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 2.配置 MQ 的地址
spring:
rabbitmq:
host:192.168.150.101
port:5672
virtual-host:/hmall
username: hmall
password:123
- 配置 MQ 的 Configure
@ConfigurationpublicclassMqConfig{@BeanpublicMessageConverterjackson2JsonMessageConverter(){returnnewJackson2JsonMessageConverter();}}
- 编写监听器
@Component@RequiredArgsConstructorpublicclassPayStatusListener{privatefinalIOrderService orderService;@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="mark.order.pay.queue", durable ="true"),
exchange =@Exchange(name ="pay.topic", type =ExchangeTypes.TOPIC),
key ="pay.success"))publicvoidlistenOrderPay(Long orderId){// 标记订单状态为已支付
orderService.markOrderPaySuccess(orderId);}}
- 业务异步调用
rabbitTemplate.convertAndSend("pay.topic","pay.success", po.getBizOrderNo());
版权归原作者 山脚ice 所有, 如有侵权,请联系我们删除。