创建消费者
第1步:基于Spring Initialzr方式创建zmall-rabbitmq消费者模块
第2步:在公共模块中添加rabbitmq相关依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第3步:配置子模块zmall-rabbitmq的pom.xml,引入公共模块zmall-common
<dependencies>
<dependency>
<groupId>com.zking.zmall</groupId>
<artifactId>zmall-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
第4步:配置父模块的pom.xml,添加子模块zmall-rabbitmq
<modules>
<module>zmall-common</module>
...
<module>zmall-rabbitmq</module>
</modules>
第5步:配置application.yml
server:
port: 8050
spring:
application:
name: zmall-rabbitmq
datasource:
#type连接池类型 DBCP,C3P0,Hikari,Druid,默认为Hikari
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/zmall?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
username: root
password: 1234
cloud:
nacos:
config:
server-addr: localhost:8848
redis:
host: localhost
port: 6379
password: 123456
jedis:
pool:
max-active: 100
max-wait: 10
max-idle: 10
min-idle: 10
database: 0
rabbitmq:
host: 192.168.70.132
port: 5672
username: admin
password: admin
virtual-host: my_vhost
# 发送者开启 confirm 确认机制
#publisher-confirm-type: correlated
# 发送者开启 return 确认机制
#publisher-returns: true
# 设置消费端手动 ack
listener:
simple:
#手动应答
acknowledge-mode: manual
#消费端最小并发数
concurrency: 5
#消费端最大并发数
max-concurrency: 10
#一次请求中预处理的消息数量
prefetch: 5
# 是否支持重试
retry:
#启用消费重试
enabled: true
#重试次数
max-attempts: 3
#重试间隔时间
initial-interval: 3000
cache:
channel:
#缓存的channel数量
size: 50
#mybatis-plus配置
mybatis-plus:
#所对应的 XML 文件位置
mapper-locations: classpath*:/mapper/*Mapper.xml
#别名包扫描路径
type-aliases-package: com.zking.zmall.model
configuration:
#驼峰命名规则
map-underscore-to-camel-case: true
#日志配置
logging:
level:
com.zking.zmall.mapper: debug
消费者采用的是手动消费模式,请注意设置spring.rabbitmq.listener.simple.acknowledge-mode=manual
第6步:配置启动类
@EnableFeignClients
@EnableDiscoveryClient
@MapperScan({"com.zking.zmall.mapper"})
@SpringBootApplication
public class ZmallRabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(ZmallRabbitmqApplication.class, args);
}
}
创建订单链路配置
定义RabbitMQ配置类
定义RabbitMQ配置类,设置生产者发送数据时自动转换成JSON,设置消费者获取消息自动转换成JSON。
@ConfigurationpublicclassRabbitmqConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate template =newRabbitTemplate(connectionFactory);
template.setMessageConverter(newJackson2JsonMessageConverter());return template;}@BeanpublicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(newJackson2JsonMessageConverter());return factory;}}
设置RabbitTemplate消息转换模式为Jackson2JsonMessageConverter;
设置RabbitMQ消费者监听器的的消息转换模式为Jackson2JsonMessageConverter;
创建RabbitmqOrderConfig配置类
创建RabbitmqOrderConfig配置类,增加订单队列、交换机及绑定关系。
@ConfigurationpublicclassRabbitmqOrderConfig{publicstaticfinalStringORDER_QUEUE="order-queue";publicstaticfinalStringORDER_EXCHANGE="order-exchange";publicstaticfinalStringORDER_ROUTING_KEY="order-routing-key";@BeanpublicQueueorderQueue(){returnnewQueue(ORDER_QUEUE,true);}@BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange(ORDER_EXCHANGE,true,false);}@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);}}
如何实现RabbitMQ重复投递机制
开启发送者消息确认模式
配置application.yml,开启发送者confirm确认机制和return确认机制
spring:
rabbitmq:
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制
publisher-returns: true
消息发送确认
rabbitmq
的消息确认分为两部分:发送消息确认 和 消息接收确认
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jnR5uSTO-1676642010629)(images\rabbitmq01.jpg)]
发送消息确认:用来确认生产者
producer
将消息发送到
broker
,
broker
上的交换机
exchange
再投递给队列
queue
的过程中,消息是否成功投递。
消息从
producer
到
rabbitmq broker
有一个
confirmCallback
确认模式。
消息从
exchange
到
queue
投递失败有一个
returnCallback
退回模式。
我们可以利用这两个
Callback
来确保消息100%送达。
Broker:简单来说,就是一个消息队列服务器实体。
创建ConfirmCallBack确认模式
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
*
* @param correlationData 对象内部只有一个 id 属性,用来表示当前消息的唯一性
* @param ack 消息投递到broker 的状态,true表示成功
* @param cause 表示投递失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
} else {
log.info("发送者已经收到确认,ack={}, cause={}",ack, cause);
}
}
}
创建ReturnCallBack退回模式
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
/**
*
* @param message 消息体
* @param replyCode 响应code
* @param replyText 响应内容
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
创建生产者
创建生产者,模拟发送消息
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ReturnCallbackService returnCallbackService;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@RequestMapping("/sendMessage")
public void sendMessage(){
Order order=new Order();
order.setId(1);
order.setUserId(2);
order.setLoginName("zhangsan");
order.setUserAddress("长沙");
order.setCreateTime(new Date());
order.setCost(120.0F);
order.setSerialNumber(0L);
order.setState(0);
//ConfirmCallback确认模式
rabbitTemplate.setConfirmCallback(confirmCallbackService);
//ReturnCallback退回模式
rabbitTemplate.setReturnCallback(returnCallbackService);
rabbitTemplate.convertAndSend(RabbitmqOrderConfig.ORDER_EXCHANGE,
RabbitmqOrderConfig.ORDER_ROUTING_KEY,order);
}
}
创建消费者(手动消费)
@Slf4j
@Component
public class OrderConsumerListener {
//最大重试次数
private static final Integer MAX_RECONSUME_COUNT=3;
//用于记录消息重试次数的集合,可以采用Redis方式实现
private static Map<String,Integer> retryMap=new HashMap<>();
@RabbitHandler
@RabbitListener(queues = {"order-queue"},ackMode = "MANUAL")
public void recieveMessage(Message message,
Order order,
Channel channel) throws IOException {
//channel内按顺序自增的消息ID
long deliverTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到消息:"+message+",消息内容:"+ JSON.toJSONString(order));
//模拟异常,开始消息重试
int i= 1/0;
} catch (Exception e) {
e.printStackTrace();
String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
Integer retryCount = retryMap.get(msgId)==null?1:retryMap.get(msgId);
log.info("即将开始第{}次消息重试....",retryCount);
if(retryCount>=MAX_RECONSUME_COUNT){
log.info("重试次数达到3次,消息被拒绝,retryCount="+retryCount);
//此处要注意:当重试次数到达3次后,将拒绝消息且不在重新入队列
channel.basicReject(deliverTag,false);
}else{
//重新发送消息到队尾
//再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.MINIMAL_PERSISTENT_BASIC,
JSON.toJSONBytes(order));
}
retryMap.put(msgId,retryCount+1);
}
//成功确认消息,非批量模式
channel.basicAck(deliverTag, false);
}
}
启动测试
从测试结果上来看,当消费者监听器出现异常后;进入消息重试模式,并且设置消息重试次数为3次,重试次数达到3次,消息被拒绝,不再重新投递到队列中。这里只是为了演示消息重试机制,并未考虑到后续的消息拒绝之后的处理。
采坑日记
异常点一:@RabbitListener
异常原因:@RabbitListener作用于类上引发异常;
解决方案:@RabbitListener移至消费者监听器的方法上,而@RabbitListener只适用于方法级别。
异常点二:手动确认消息
虽然在消费者端的application.yml中配置手动消费模式,但是在服务消费时引发了这个异常错误,导致重复消费的问题。原因是使用@RabbitListener注解会自动ACK,如果方法中再手动ACK会造成重复ACK,所以报错;解决方式就是在@RabbitListener中配置手动消费模式:ackMode = “MANUAL”。
异常点三:消息格式
在消费者消费消息时引发异常,触发消息重新投递,但是由于重新投递时导致消息格式问题引发了消息转换异常。
具体原因通过查看日志发现,重新投递的消息格式为text/plain,而我们在处理消息时采用的是json方式,导致消息转换异常。解决方案:将重新发送消息的状态由MessageProperties.PERSISTENT_TEXT_PLAIN
更改为
MessageProperties.MINIMAL_PERSISTENT_BASIC
异常点四:消息不确认
这是一个非常没技术含量的坑,但却是非常容易犯错的地方。开启消息确认机制,消费消息别忘了
channel.basicAck
,否则消息会一直存在,导致重复消费。
异常点五:消息无限投递
最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息,
int a = 1 / 0
发生异常后将消息重新投入队列
@RabbitHandler
public void recieveMessage(Message message,
Order order,
Channel channel) throws IOException {
//channel内按顺序自增的消息ID
long deliverTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到消息:"+message+",消息内容:"+ JSON.toJSONString(order));
int i = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
但是有个问题是,业务代码一旦出现
bug
99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。
经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。
消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。
而解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。
//重新发送消息到队尾
//再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.MINIMAL_PERSISTENT_BASIC,
JSON.toJSONBytes(order));
异常点六:重复消费
如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助
MySQL
、或者
redis
将消息持久化。
秒杀业务优化
修改秒杀订单生成方式
第1步:修改zmall-order订单模块的application.yml,加入rabbitmq相关配置
spring:
rabbitmq:
host: 192.168.70.132
port: 5672
username: admin
password: admin
virtual-host: my_vhost
# 设置消费端手动 ack
listener:
simple:
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true
max-attempts: 3
第2步:修改秒杀订单生成方式,针对抢购成功的秒杀订单直接推送到RabbitMQ中
@Transactional
@Override
public JsonResponseBody<?> createKillOrder(User user, Integer pid) {
//判断用户是否登录
if(null==user)
throw new BusinessException(JsonResponseStatus.TOKEN_ERROR);
//根据秒杀商品ID和用户ID判断是否重复抢购
Order order = redisService.getKillOrderByUidAndPid(user.getId(), pid);
if(null!=order)
return new JsonResponseBody<>(JsonResponseStatus.ORDER_REPART);
//Redis库存预减
long stock = redisService.decrement(pid);
if(stock<0){
redisService.increment(pid);
return new JsonResponseBody<>(JsonResponseStatus.STOCK_EMPTY);
}
//创建订单
order=new Order();
order.setUserId(user.getId());
order.setLoginName(user.getLoginName());
order.setPid(pid);
//将生成的秒杀订单保存到Redis中
redisService.setKillOrderToRedis(pid,order);
//将生成的秒杀订单推送到RabbitMQ中的订单队列中
rabbitTemplate.convertAndSend(RabbitmqOrderConfig.ORDER_EXCHANGE,
RabbitmqOrderConfig.ORDER_ROUTING_KEY,order);
return new JsonResponseBody<>();
}
消费者监听器完成秒杀订单生成
第1步:将zmall-order订单模块中的service业务处理接口及实现类移至消息者监听器模块。
第2步:在IOrderService及OrderServiceImpl中重新定义生成秒杀订单方法
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {
@Autowired
private IKillService killService;
@Autowired
private ApiProductService productService;
@Autowired
private IOrderDetailService orderDetailService;
@Transactional
@Override
public void saveOrder(Order order) {
//1.根据商品ID获取商品
Product product = productService.getProductById(order.getPid());
//2.秒杀商品库存减一
boolean flag=killService.updateKillStockById(order.getPid());
if(!flag)
return;
//3.生成秒杀订单及订单项
SnowFlake snowFlake=new SnowFlake(2,3);
Long orderId=snowFlake.nextId();
//订单
order.setSerialNumber(orderId);
order.setCost(product.getPrice());
this.save(order);
//订单项
OrderDetail orderDetail=new OrderDetail();
orderDetail.setOrderId(orderId);
orderDetail.setProductId(product.getId());
orderDetail.setQuantity(1);
orderDetail.setCost(product.getPrice());
orderDetailService.save(orderDetail);
}
}
第3步:修改秒杀订单消费者监听器
@Slf4j
@Component
public class OrderConsumerListener {
//最大重试次数
private static final Integer MAX_RECONSUME_COUNT=3;
//用于记录消息重试次数的集合,可以采用Redis方式实现
private static Map<String,Integer> retryMap=new HashMap<>();
@Autowired
private IOrderService orderService;
@RabbitHandler
@RabbitListener(queues = {"order-queue"},ackMode = "MANUAL")
public void recieveMessage(Message message,
Order order,
Channel channel) throws IOException {
//channel内按顺序自增的消息ID
long deliverTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到消息:"+message+",消息内容:"+ JSON.toJSONString(order));
//模拟异常,开始消息重试
//int i= 1/0;
//保存秒杀订单及订单项
orderService.saveOrder(order);
} catch (Exception e) {
e.printStackTrace();
String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
Integer retryCount = retryMap.get(msgId)==null?1:retryMap.get(msgId);
log.info("即将开始第{}次消息重试....",retryCount);
if(retryCount>=MAX_RECONSUME_COUNT){
log.info("重试次数达到3次,消息被拒绝,retryCount="+retryCount);
//此处要注意:当重试次数到达3次后,将拒绝消息且不在重新入队列
channel.basicReject(deliverTag,false);
}else{
//重新发送消息到队尾
//再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.MINIMAL_PERSISTENT_BASIC,
JSON.toJSONBytes(order));
}
retryMap.put(msgId,retryCount+1);
}
//成功确认消息,非批量模式
channel.basicAck(deliverTag, false);
}
}
重启jmeter压测,并查看测试结果。
版权归原作者 追梦梓辰 所有, 如有侵权,请联系我们删除。