0


【02基础】- RabbitMQ基础

目录

2- RabbitMQ

2-1 介绍和安装

RabbitMQ 的整体架构以及核心概念

  • publisher:消息发送者
  • cunsumer:消息消费者
  • queue:队列
  • exchange:交换机,负责路由消息
  • virtual-host :虚拟主机,起到数据隔离的作用;一个 MQ 中可以创建多个 virtual-host

数据流转的模型①生产者将数据发送给交换机 ——> ②交换机将消息路由给队列 ——> ③消费者监听队列拿到消息

安装

  1. 上传镜像文件 mq.tar 到 Linux 系统中
  2. 执行命令
docker load -i mq.tar
  1. 复制以下代码执行
  • 其中 15672 ,是控制台端口
  • 其中 5672 ,是收发消息的端口
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management

2-2 RabbitMQ 快速入门

需求

  • rabbitmq 的控制台完成下列操作:
  • 新建队列 hello.queue1hello.queue2
  • 向默认的 amp.fanout 交换机发送一条消息
  • 查看消息是否到达 hello.queue1hello.queue2

实现

  • 需要创建队列,同时在交换机中需要绑定队列才能实现消息的路由。

总结规律

①如果交换机和队列没有绑定能否收到消息?

  • 交换机:是负责路由和转发消息的。交换机通过绑定队列,之后可以将消息转发到队列中。

②如果绑定了所有队列是不是所有队列都可以收到消息?

  • 是的,如果一个交换机绑定了多个队列,那类似于广播的效果所有队列都能收到消息。

2-3 RabbitMQ 数据隔离

  • 在 RabbitMQ 中有虚拟主机的概念,对于交换机和队列而言都有二者自己的虚拟主机。

需求

  • 在 RabbitMQ 的控制台下完成下列操作 - 新建一个用户 hmall- 为 hmall 用户创建一个 vitual host- 测试不同的 vitual host 之间的数据隔离现象

实现

  • ① 首先在 Admin ——> Users 中创建用户
  • ② 在 Admin ——> Users 中创建虚拟主机

总结

  • 各个虚拟主机下都是相互隔离的。

3- Java客户端

3-1 快速入门

AMQP

什么是 AMQP?

  • Advanced Message Queuing Protocol,是一种高级的消息队列协议,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP

  • Spring AMQP是基于 AMQP 协议定义的一套 API 规范,提供了模板来发送和接收消息。包含两部分,其中Spring AMQP 是基础抽象,spring-rabbit 是底层的默认实现

快速入门

需求

  • 利用控制台创建队列 simple.queue
  • 在 publisher 服务中,利用 SpringAMQP 直接向 simple.queue 发送消息
  • 在 consumer 服务中,利用 SpringAMQP 编写消费者监听 simple.queue 队列

实现

  • ① 引入 spring-amqp依赖,在父工程中引入 spring-amqp 依赖,这样 publisherconsumer 服务都可以使用
<!-- AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • ② 在每个微服务中引入 MQ 服务端信息,这样微服务才能连接到 RabbitMQ

  • ③ 发送消息: SpringAMQP 提供了 RabbitTemplate 工具类,方便我们发送消息,发送消息的代码如下
@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message ="hello, spring amqp!";// 发送消息
    rabbitTemplate.convertAndSend(queueName, message);}
  • ④ 接收消息
  • SpringAMQP 提供声明式的消息监听,我们只需要通过 注解 在方法上声明要监听的队列名称,将来 SpringAMQP 就会把消息传递给当前方法。
@Slf4j@ComponentpublicclassSpringRabbitListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{
        log.info("spring 消息接收到消息: ["+ msg +"]");if(true){thrownewMessageConversionException("故意的");}
        log.info("消息处理完毕");}}

📑小结:SpringAMQP如何收发消息?

在这里插入图片描述


3-2 WorkQueues 任务模型

  • Work queues,任务模型。简单来说就是 让多个消费者绑定到一个队列,共同消费队列中的消息。

案例-使用 WorkQueue 单队列绑定多消费者

模拟 WorkQueue 实现一个队列绑定多个消费者,基本思路如下

  • ① 在 RabbitMQ 的控制台创建一个队列,名为 work.queue
  • ② 在 publisher 服务中定义测试方法,在 1 秒内产生 50 条消息,发送到 work.queue
  • ③ 在 consumer 服务中顶级两个消息监听者,都监听 work.queue 队列
  • ④ 消费者每 1 秒处理 50 条消息,消费者每 2 秒处理 5 条消息

实现

  • ① 实现消费者
@Slf4j@ComponentpublicclassMqListener{@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String msg){System.out.println("消费者收到了simple.queue的消息: ["+ msg +"]");}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(String msg){System.out.println("消费者1 收到了 work.queue的消息: ["+ msg +"]");}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(String msg){System.err.println("消费者2 收到了 work.queue的消息: ["+ msg +"]");}}
  • ② 实现生产者
@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestSendMessage2Queue(){String queueName ="simple.queue";String msg ="hello, amqp!";
        rabbitTemplate.convertAndSend(queueName, msg);}@TestvoidtestWorkQueue()throwsInterruptedException{String queueName ="work.queue";for(int i =1; i <=50; i++){String msg ="hello, worker, message_"+ i;
            rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}}

结果

  • 发现在消费的过程中,两个消费者并没有都消费 50 条消息。
  • 二者消费的过程是采用轮询的方式进行消费。

通过改变消费速度

  • 即便改变了消费的速度,消费的过程中消费者1 和消费者2,也是按照轮询的方式消费任务。
@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue1(String msg){System.out.println("消费者1 收到了 work.queue的消息: ["+ msg +"]");}@RabbitListener(queues ="work.queue")publicvoidlistenWorkQueue2(String msg){System.err.println("消费者2 收到了 work.queue的消息: ["+ msg +"]");Thread.sleep(20);}

📑小结:Work模型的使用

在这里插入图片描述


3-3 Fanout 交换机

Fanout Exchange 交换机会将收到的消息广播到每一个跟其绑定的 queue,所以也叫 广播模式

案例- FanouotExchange 使用

实现思路如下

  • ① 在 RabbitMQ 控制台中,声明队列 fanout.queue1fanout.queue2
  • ② 在 RabbitMQ 控制台中,声明交换机 hmall.fanout 将两个队列与其绑定
  • ③ 在 consumer 服务中,编写两个消费者方法,分别监听 fanout.queue1**fanout.queue2
  • ④ 在 publisher 中编写测试方法,向 hmall.fanout 发送消息。

Fanout 交换机

  • 消费者
@RabbitListener(queues ="fanout.queue1")publicvoidlistenWorkQueue1(String msg){System.out.println("消费者1 收到了 fanout.queue1的消息: ["+ msg +"]");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenWorkQueue2(String msg){System.err.println("消费者2 收到了 fanout.queue2的消息: ["+ msg +"]");}
  • 生产者
@TestvoidtestSendFanout(){String exchangeName ="hmall.fanout";String msg ="hello,everyone";
    rebbitTemplate.converAndSend(exchangeName,numm,msg);}
  • 运行结果

📑小结:交换机的作用是什么?

在这里插入图片描述


3-4 Direct 交换机

Direct Exchange 会将接收到的消息根据路由规则路由到指定的 Queue,因此可以称为定向路由。

  • 每一个 Queue 都与 Exchange 设置一个 Bindingkey
  • 发布者发送消息时,制定消息的 RoutingKey
  • Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列

案例-利用 SpringAMQP 演示 DirectExchange 的使用

需求如下

  • ① 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
  • ② 在 RabbitMQ 控制台中,声明交换机 hmall.direct,将两个队列与其绑定
  • ③ 在 consumer 服务中,编写两个消费方法,分别监听 direct.queue1 和 direct.queue2
  • ④ 在 publisher 中编写测试方法,利用不同的 RoutingKey 向 hmall.direct 发送消息

📑小结:DirectExchange 的作用是什么?

:::info

  • DirectExchange 交换机可以通过 bindingKey 来设置,将消息通过 bindingKey 发送到指定的队列中。通过设置合适的绑定键,您可以确保特定的消息被发送到特定的微服务进行处理。这样可以实现消息的精确路由,确保消息只被需要的消费者接收和处理。

:::


3-5 Topic 交换机

TopicExchange 与 DirectExchange 类似,区别在于 routingKey 可以是多个单词的列表,并且以

.

分割。

  • Queue 与 Exchange 指定 BindingKey 时可以使用通配符 - #代表 0 个 或多个单词。- * :代指一个单词

  • 类似上述实现,如果一个 bindingKey 定义为了 china.# 那么,对于其而言只会接收与 china.#开头的消息。

案例-利用 SpringAMQP 演示 DirectExchange 的适用

需求如下

  • ① 在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
  • ② 在 RabbitMQ 控制台中,声明交换机 hmall.topic 将两个队列与其绑定
  • ③ 在 consumer 服务中,编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
  • ④ 在 publisher 中编写测试方法,利用不同的 RoutingKey 向 hmall.topic 发送消息

  • 消费者
@RabbitListener(queues ="topic.queue1")publicvoidlistenTopicQueue1(String msg)throwsInterruptedException{System.out.println("消费者1 收到了 topic.queue1的消息: ["+ msg +"] ");}@RabbitListener(queues ="topic.queue2")publicvoidlistenTopicQueue2(String msg)throwsInterruptedException{System.out.println("消费者2 收到了 topic.queue2的消息: ["+ msg +"] ");}
  • 发送者 - 以下情况消息会被路由到 **#.news** 的队列中。
@TestvoidtestSendTopic(){String exchangeName ="hmall.topic";String msg ="蓝色通知,警报解除,哥斯拉是放的气球";
    rabbitTemplate.convertAndSend(exchangeName,"japan.news", msg);}
  • 发送者2 - 以下情况两个队列都会收到消息。
@TestvoidtestSendTopic(){String exchangeName ="hmall.topic";String msg ="蓝色通知,警报解除,哥斯拉是放的气球";
    rabbitTemplate.convertAndSend(exchangeName,"china.news", msg);}
  • 发送者3 - 以下情况只有队列1 会受到消息。
@TestvoidtestSendTopic(){String exchangeName ="hmall.topic";String msg ="蓝色通知,警报解除,哥斯拉是放的气球";
    rabbitTemplate.convertAndSend(exchangeName,"china.weather", msg);}

📑小结:描述下 Direct 交换机和 Topic 交换机的差异

在这里插入图片描述

3-6 声明队列和交换机

  • 使用 Java 代码声明队列交换机才是最靠谱的方式

声明队列和交换机的方式(Java代码实现)

SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系

  • Queue:用于声明队列,可用工厂类 QueueBuilder 创建
  • Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建

代码实现
  • 例如,声明一个 Fanout 类型的交换机,并且创建队列与其绑定
publicclassFanoutConfiguration{@BeanpublicFanoutExchangefanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();returnnewFanoutExchange("hmall.fanout2");}@BeanpublicQueuefanoutQueue3(){// QueueBuilder.durable("ff").build();returnnewQueue("fanout.queue3");}@BeanpublicBindingfanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}@BeanpublicQueuefanoutQueue4(){// QueueBuilder.durable("ff").build();returnnewQueue("fanout.queue4");}@BeanpublicBindingfanoutBinding4(){returnBindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}}
使用注解的方式解决 Direct 交换机问题
  • 由于 Direct 交换机可以配置对应的 key,但对于声明式方式来说,需要对每个 key 都写一个 binding 方法,这样效率很低,所以引入注解的方式实现
  • SpringAMQP 还提供了基于 @RabbitListener 注解来声明队列和交换机的方式,在 Listener 的部分通过注解实现
@RabbitListener(bindings =@QueueBinding(
        value =@Queue(name ="direct.queue1",durable ="true"),
        exchange =@Exchange(name ="hmall,direct",type =ExchangeTypes,DIRECT),
        key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg)throwsInterruptedException{System.out.println("消费者 1 收到了 direct.queueq 的消息:【"+msg+"】");}

📑小结

在这里插入图片描述


3-7 消息转换器

案例-利用 SpringAMQP 发送对象类型的消息

  • ① 声明一个队列,名为 object.queue
  • ② 编写单元测试,向队列中直接发送一条消息,消息类型为 Map
  • ③ 在控制台查看消息,总结你能发现的问题
// 准备消息Map<String,Object> msg =newHashMap<>();
msg.put("name","Jack");
msg.put("age",21);
@TestvoidtestSendObject(){Map<String,Object> msg =newHashMap<>(2);
    msg.put("name","jack");
    msg.put("age",21);
    rabbitTemplate.convertAndSend("object.queue", msg);}

问题

  • Spring 对消息对象的处理是由 org.springframework,amqp.support.converter.MessageConveerter 来处理的。而默认实现是 SimpleMessageConverter,基于 JDK 的 ObjectOutputStream 完成序列化。

存在以下问题

  • JDK 的序列化有安全风险
  • JDK 序列化的消息太大
  • JDK 序列化的消息可读性差

建议采用 JSON 薛丽华代替默认的的 JDK 序列化,要做两件事情:

  • 在 publisher 和 consumer 中都要引入 Jackson 依赖:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
  • 在 publisher 和 consumer 中都要配置 MessageConverter
@BeanpublicMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter();}

4- 使用 MQ 改造支付业务代码

案例

  • 需求:改造余额支付功能,不再同步调用交易服务的 OpenFeign 接口,而是采用异步的 MQ 通知交易服务更改订单状态

    1. 业务中引入 AMQP 依赖
<!-- amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  • 2.配置 MQ 的地址
spring:
  rabbitmq:
    host:192.168.150.101
    port:5672
    virtual-host:/hmall
    username: hmall
    password:123
    1. 配置 MQ 的 Configure
@ConfigurationpublicclassMqConfig{@BeanpublicMessageConverterjackson2JsonMessageConverter(){returnnewJackson2JsonMessageConverter();}}
    1. 编写监听器
@Component@RequiredArgsConstructorpublicclassPayStatusListener{privatefinalIOrderService orderService;@RabbitListener(bindings =@QueueBinding(
        value =@Queue(name ="mark.order.pay.queue", durable ="true"),
        exchange =@Exchange(name ="pay.topic", type =ExchangeTypes.TOPIC),
        key ="pay.success"))publicvoidlistenOrderPay(Long orderId){// 标记订单状态为已支付
        orderService.markOrderPaySuccess(orderId);}}
    1. 业务异步调用
rabbitTemplate.convertAndSend("pay.topic","pay.success", po.getBizOrderNo());
标签: rabbitmq

本文转载自: https://blog.csdn.net/weixin_44382896/article/details/143305452
版权归原作者 山脚ice 所有, 如有侵权,请联系我们删除。

“【02基础】- RabbitMQ基础”的评论:

还没有评论