RabbitMQ简介
以商品订单场景为例,
如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ就是这样一款消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
(2)典型应用场景:
异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
日志处理
应用解耦
在docker中安装RabbitMQ
docker pull rabbitmq:management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
管理后台:http://IP:15672
第一步建立rabbit-util模块封装
由于后续可能多个模块都会使用mq,所以我们把它封装成一个模块,需要的地方直接引用即可
第二步修改pom.xml
<dependencies>
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
</dependencies>
第三步封装service方法
@Service
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param exchange 交换机
* @param routingKey 路由键
* @param message 消息
*/
public boolean sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
}
}
第四步配置mq消息转换器
@Configuration
public class MQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
在短信模块如果使用mq(假如是下单成功短信通知)
在短信模块引入依赖
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>rabbit_util</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
在短信模块添加配置
#rabbitmq地址
spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
添加常量配置
在rabbit-util模块com.atguigu.yygh.common.constant.MqConst类添加
public class MqConst {
/**
* 预约下单
*/
public static final String EXCHANGE_DIRECT_ORDER
= "exchange.direct.order";
public static final String ROUTING_ORDER = "order";
//队列
public static final String QUEUE_ORDER = "queue.order";
/**
* 短信
*/
public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
public static final String ROUTING_MSM_ITEM = "msm.item";
//队列
public static final String QUEUE_MSM_ITEM = "queue.msm.item";
}
在model模块封装短信实体
@Data
@ApiModel(description = "短信实体")
public class MsmVo {
@ApiModelProperty(value = "phone")
private String phone;
@ApiModelProperty(value = "短信模板code")
private String templateCode;
@ApiModelProperty(value = "短信模板参数")
private Map<String,Object> param;
}
在MsmService类添加接口
boolean send(MsmVo msmVo);
在MsmServiceImpl类添加接口实现
@Override
public boolean send(MsmVo msmVo) {
if(!StringUtils.isEmpty(msmVo.getPhone())) {
String code = (String)msmVo.getParam().get("code");
return this.send(msmVo.getPhone(),code);
}
return false;
}
封装mq短信消息监听器
@Component
public class SmsReceiver {
@Autowired
private MsmService msmService;
//接收mq的消息是否调用send方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_MSM_ITEM, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
key = {MqConst.ROUTING_MSM_ITEM}
))
//只要Rabbitservice(参数)只要参数有MqConst.QUEUE_MSM_ITEM,MqConst.EXCHANGE_DIRECT_MSM,MqConst.ROUTING_MSM_ITEM这个方法就会执行
public void send(MsmVo msmVo, Message message, Channel channel) {
msmService.send(msmVo);
}
}
封装mq更新订单数量监听器schedule模块
在订单模块引入依赖
<!--rabbitmq消息队列-->
<dependency>
<groupId>com.atguigu.yygh</groupId>
<artifactId>rabbit-util</artifactId>
<version>1.0</version>
</dependency>
#rabbitmq地址
spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
在rabbit-util模块com.atguigu.yygh.common.constant.MqConst类添加
/**
* 预约下单
*/
public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
public static final String ROUTING_ORDER = "order";
//队列
public static final String QUEUE_ORDER = "queue.order";
定单实体
@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo {
@ApiModelProperty(value = "可预约数")
private Integer reservedNumber;
@ApiModelProperty(value = "剩余预约数")
private Integer availableNumber;
@ApiModelProperty(value = "排班id")
private String scheduleId;
@ApiModelProperty(value = "短信实体")
private MsmVo msmVo;
}
封装mq监听器
@Component
public class HospitalReceiver {
@Autowired
private ScheduleService scheduleService;
@Autowired
private RabbitService rabbitService;
//只要rabbitService.sendMessage(参数)参数含有 MqConst.QUEUE_ORDER,MqConst.EXCHANGE_DIRECT_ORDER,MqConst.ROUTING_ORDER,这个receiver()方法就会执行
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_ORDER, durable = "true"),
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
key = {MqConst.ROUTING_ORDER}
))
public void receiver(OrderMqVo orderMqVo, Message message, Channel channel) throws IOException {
//下单成功更新预约数
Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
schedule.setReservedNumber(orderMqVo.getReservedNumber());
schedule.setAvailableNumber(orderMqVo.getAvailableNumber());
scheduleService.update(schedule);
//发送短信
MsmVo msmVo = orderMqVo.getMsmVo();
if(null != msmVo) {
//给mq发消息发送完后,监听到发送短信的消息调用发送消息的接口
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_MSM, MqConst.ROUTING_MSM_ITEM, msmVo);
}
}
}
说明:已统一引入,该对象放一个短信实体,预约下单成功后,我们发送一条消息,让mq来保证两个消息都发送成功
在order模块中引入依赖
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>rabbit_util</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
在resources/application.properties添加
#rabbitmq地址
spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
1,发送订单mq修改OrderServiceImpl类下单方法
//给mq发送消息,监听到后调用订单修改
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_ORDER,
MqConst.ROUTING_ORDER, orderMqVo);
2,只要执行这个方法,schedule模块中的监听器 receiver()方法就会被调用,更新订单
更新完后执行这个
//给mq发消息发送完后,监听到发送短信的消息调用发送消息的接口
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_MSM,
MqConst.ROUTING_MSM_ITEM, msmVo);
执行完上面的,sms模块中的短信监听器就会执行SmsReceiver类中的send就会被执行,发送短信
版权归原作者 敢敢变成了憨憨 所有, 如有侵权,请联系我们删除。