RabbitMQ
安装MQ
docker run \-eRABBITMQ_DEFAULT_USER=itheima \-eRABBITMQ_DEFAULT_PASS=123321\-v mq-plugins:/plugins \--name mq \--hostname mq \-p15672:15672 \-p5672:5672 \--network hmall \-d\
rabbitmq:3.8-management
可以看到在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。
**publisher**
:生产者,也就是发送消息的一方**consumer**
:消费者,也就是消费消息的一方**queue**
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理**exchange**
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。**virtual host**
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
SpringAMQP
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
消息发送
首先配置MQ地址,在
publisher
服务的
application.yml
中添加配置:
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport:5672# 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password:123# 密码
然后在
publisher
服务中编写测试类
SpringAmqpTest
,并利用
RabbitTemplate
实现消息发送:
packagecom.itheima.publisher.amqp;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestpublicclassSpringAmqpTest{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSimpleQueue(){// 队列名称String queueName ="simple.queue";// 消息String message ="hello, spring amqp!";// 发送消息
rabbitTemplate.convertAndSend(queueName, message);}}
消息接收
首先配置MQ地址,在
consumer
服务的
application.yml
中添加配置:
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport:5672# 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password:123# 密码
然后在
consumer
服务的
com.itheima.consumer.listener
包中新建一个类
SpringRabbitListener
,代码如下:
packagecom.itheima.consumer.listener;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassSpringRabbitListener{// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueueMessage(String msg)throwsInterruptedException{System.out.println("spring 消费者接收到消息:【"+ msg +"】");}}
work模型
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:prefetch:1# 每次只能获取一条消息,处理完成才能获取下一个消息
充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
交换机
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机
在publisher服务的SpringAmqpTest类中添加测试方法生产者:
@TestpublicvoidtestFanoutExchange(){// 交换机名称String exchangeName ="hmall.fanout";// 消息String message ="hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName,"", message);}
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
@RabbitListener(queues ="fanout.queue1")publicvoidlistenFanoutQueue1(String msg){System.out.println("消费者1接收到Fanout消息:【"+ msg +"】");}@RabbitListener(queues ="fanout.queue2")publicvoidlistenFanoutQueue2(String msg){System.out.println("消费者2接收到Fanout消息:【"+ msg +"】");}
Direct交换机
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
Topic交换机
Topic
类型的
Exchange
与
Direct
相比,都是可以根据
RoutingKey
把消息路由到不同的队列。
只不过
Topic
类型
Exchange
可以让队列在绑定
BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以
.
分割,例如:
item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
声明队列和交换机
由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
fanout示例
在consumer中创建一个类,声明队列和交换机:
packagecom.itheima.consumer.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanoutConfig{/**
* 声明交换机
* @return Fanout类型交换机
*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("hmall.fanout");}/**
* 第1个队列
*/@BeanpublicQueuefanoutQueue1(){returnnewQueue("fanout.queue1");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/**
* 第2个队列
*/@BeanpublicQueuefanoutQueue2(){returnnewQueue("fanout.queue2");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){returnBindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
direct示例
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:
packagecom.itheima.consumer.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{/**
* 声明交换机
* @return Direct类型交换机
*/@BeanpublicDirectExchangedirectExchange(){returnExchangeBuilder.directExchange("hmall.direct").build();}/**
* 第1个队列
*/@BeanpublicQueuedirectQueue1(){returnnewQueue("direct.queue1");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1WithRed(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("red");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue1WithBlue(Queue directQueue1,DirectExchange directExchange){returnBindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/**
* 第2个队列
*/@BeanpublicQueuedirectQueue2(){returnnewQueue("direct.queue2");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2WithRed(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("red");}/**
* 绑定队列和交换机
*/@BeanpublicBindingbindingQueue2WithYellow(Queue directQueue2,DirectExchange directExchange){returnBindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
基于注解声明
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
例如,我们同样声明Direct模式的交换机和队列:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue1"),
exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
key ={"red","blue"}))publicvoidlistenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="direct.queue2"),
exchange =@Exchange(name ="hmall.direct", type =ExchangeTypes.DIRECT),
key ={"red","yellow"}))publicvoidlistenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【"+ msg +"】");}
是不是简单多了。
再试试Topic模式:
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue1"),
exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
key ="china.#"))publicvoidlistenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【"+ msg +"】");}@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="topic.queue2"),
exchange =@Exchange(name ="hmall.topic", type =ExchangeTypes.TOPIC),
key ="#.news"))publicvoidlistenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【"+ msg +"】");}
消息转换器
Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在
publisher
和
consumer
两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>
注意,如果项目中引入了
spring-boot-starter-web
依赖,则无需再次引入
Jackson
依赖。
配置消息转换器,在
publisher
和
consumer
两个服务的启动类中添加一个Bean即可:
@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。
消费者接收Object
我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:
@RabbitListener(queues ="object.queue")publicvoidlistenSimpleQueueMessage(Map<String,Object> msg)throwsInterruptedException{System.out.println("消费者接收到object.queue消息:【"+ msg +"】");}
业务改造
案例需求:改造余额支付功能,将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用,改为基于RabbitMQ的异步通知。
我们只关注交易服务,步骤如下:
- 定义topic类型交换机,命名为
pay.topic
- 定义消息队列,命名为
mark.order.pay.queue
- 将
mark.order.pay.queue
与pay.topic
绑定,BindingKey
为pay.success
- 支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到
pay.topic
,发送消息的RoutingKey
为pay.success
,消息内容是订单id - 交易服务监听
mark.order.pay.queue
队列,接收到消息后更新订单状态为已支付
配置MQ
不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:
1)添加依赖:
<!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2)配置MQ地址:
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport:5672# 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password:123# 密码
接收消息
在trade-service服务中定义一个消息监听类:
其代码如下:
packagecom.hmall.trade.listener;importcom.hmall.trade.service.IOrderService;importlombok.RequiredArgsConstructor;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@Component@RequiredArgsConstructorpublicclassPayStatusListener{privatefinalIOrderService orderService;@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="mark.order.pay.queue", durable ="true"),
exchange =@Exchange(name ="pay.topic", type =ExchangeTypes.TOPIC),
key ="pay.success"))publicvoidlistenPaySuccess(Long orderId){
orderService.markOrderPaySuccess(orderId);}}
发送消息
修改
pay-service
服务下的
com.hmall.pay.service.impl.PayOrderServiceImpl
类中的
tryPayOrderByBalance
方法:
privatefinalRabbitTemplate rabbitTemplate;@Override@TransactionalpublicvoidtryPayOrderByBalance(PayOrderDTO payOrderDTO){// 1.查询支付单PayOrder po =getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常thrownewBizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额
userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success =markPayOrderSuccess(payOrderDTO.getId(),LocalDateTime.now());if(!success){thrownewBizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try{
rabbitTemplate.convertAndSend("pay.topic","pay.success", po.getBizOrderNo());}catch(Exception e){
log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}}
版权归原作者 miss_you1213 所有, 如有侵权,请联系我们删除。