0


RabbitMQ笔记

RabbitMQ

安装MQ
docker run \-eRABBITMQ_DEFAULT_USER=itheima \-eRABBITMQ_DEFAULT_PASS=123321\-v mq-plugins:/plugins \--name mq \--hostname mq \-p15672:15672 \-p5672:5672 \--network hmall \-d\
 rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

  • **publisher**:生产者,也就是发送消息的一方
  • **consumer**:消费者,也就是消费消息的一方
  • **queue**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • **exchange**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • **virtual host**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
SpringAMQP
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
消息发送

首先配置MQ地址,在

publisher

服务的

application.yml

中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport:5672# 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password:123# 密码

然后在

publisher

服务中编写测试类

SpringAmqpTest

,并利用

RabbitTemplate

实现消息发送:

packagecom.itheima.publisher.amqp;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message ="hello, spring amqp!";// 发送消息
        rabbitTemplate.convertAndSend(queueName, message);}}
消息接收

首先配置MQ地址,在

consumer

服务的

application.yml

中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport:5672# 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password:123# 密码

然后在

consumer

服务的

com.itheima.consumer.listener

包中新建一个类

SpringRabbitListener

,代码如下:

packagecom.itheima.consumer.listener;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassSpringRabbitListener{// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{System.out.println("spring 消费者接收到消息:【"+ msg +"】");}}
work模型

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch:1# 每次只能获取一条消息,处理完成才能获取下一个消息

充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量
交换机

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机

在publisher服务的SpringAmqpTest类中添加测试方法生产者:

@TestpublicvoidtestFanoutExchange(){// 交换机名称String exchangeName ="hmall.fanout";// 消息String message ="hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName,"", message);}

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【"+ msg +"】");}
Direct交换机
  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
Topic交换机
Topic

类型的

Exchange

Direct

相比,都是可以根据

RoutingKey

把消息路由到不同的队列。
只不过

Topic

类型

Exchange

可以让队列在绑定

BindingKey

的时候使用通配符!

BindingKey

一般都是有一个或多个单词组成,多个单词之间以

.

分割,例如:

item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu
声明队列和交换机

由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

fanout示例

在consumer中创建一个类,声明队列和交换机:

packagecom.itheima.consumer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
     * 声明交换机
     * @return Fanout类型交换机
     */@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("hmall.fanout");}/**
     * 第1个队列
     */@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/**
     * 第2个队列
     */@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

packagecom.itheima.consumer.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{/**
     * 声明交换机
     * @return Direct类型交换机
     */@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("hmall.direct").build();}/**
     * 第1个队列
     */@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue1WithRed(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("red");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue1WithBlue(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/**
     * 第2个队列
     */@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue2WithRed(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("red");}/**
     * 绑定队列和交换机
     */@BeanpublicBindingbindingQueue2WithYellow(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue1"),
    exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
    key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="direct.queue2"),
    exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
    key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}

是不是简单多了。
再试试Topic模式:

@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue1"),
    exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
    key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
    value =@Queue(name ="topic.queue2"),
    exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
    key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}
消息转换器

Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisher

consumer

两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>

注意,如果项目中引入了

spring-boot-starter-web

依赖,则无需再次引入

Jackson

依赖。

配置消息转换器,在

publisher

consumer

两个服务的启动类中添加一个Bean即可:

@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

@RabbitListener(queues ="object.queue")publicvoidlistenSimpleQueueMessage(Map<String,Object> msg)throwsInterruptedException{System.out.println("消费者接收到object.queue消息:【"+ msg +"】");}
业务改造

案例需求:改造余额支付功能,将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用,改为基于RabbitMQ的异步通知。

我们只关注交易服务,步骤如下:

  • 定义topic类型交换机,命名为pay.topic
  • 定义消息队列,命名为mark.order.pay.queue
  • mark.order.pay.queuepay.topic绑定,BindingKeypay.success
  • 支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到pay.topic,发送消息的RoutingKeypay.success,消息内容是订单id
  • 交易服务监听mark.order.pay.queue队列,接收到消息后更新订单状态为已支付
配置MQ

不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:
1)添加依赖:

<!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2)配置MQ地址:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport:5672# 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password:123# 密码
接收消息

在trade-service服务中定义一个消息监听类:
其代码如下:

packagecom.hmall.trade.listener;importcom.hmall.trade.service.IOrderService;importlombok.RequiredArgsConstructor;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RequiredArgsConstructorpublicclassPayStatusListener{privatefinalIOrderService orderService;@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="mark.order.pay.queue", durable ="true"),
            exchange =@Exchange(name ="pay.topic", type =ExchangeTypes.TOPIC),
            key ="pay.success"))publicvoidlistenPaySuccess(Long orderId){
        orderService.markOrderPaySuccess(orderId);}}
发送消息

修改

pay-service

服务下的

com.hmall.pay.service.impl.PayOrderServiceImpl

类中的

tryPayOrderByBalance

方法:

privatefinalRabbitTemplate rabbitTemplate;@Override@TransactionalpublicvoidtryPayOrderByBalance(PayOrderDTO payOrderDTO){// 1.查询支付单PayOrder po =getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常thrownewBizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额
    userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success =markPayOrderSuccess(payOrderDTO.getId(),LocalDateTime.now());if(!success){thrownewBizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try{
        rabbitTemplate.convertAndSend("pay.topic","pay.success", po.getBizOrderNo());}catch(Exception e){
        log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}}
标签: rabbitmq 笔记

本文转载自: https://blog.csdn.net/qwerasdctds/article/details/135460316
版权归原作者 miss_you1213 所有, 如有侵权,请联系我们删除。

“RabbitMQ笔记”的评论:

还没有评论