0


Springboot+RocketMQ通过事务消息优雅的实现订单支付功能

1. 事务消息

RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。

1.1 RocketMQ事务消息的原理

  1. 半事务消息发送:生产者将半事务消息发送至RocketMQ服务端。
  2. 消息持久化及返回Ack确认:RocketMQ服务端接收到半事务消息并持久化成功后,向生产者返回Ack确认消息已经发送成功。此时消息状态为半事务消息。
  3. 执行本地事务逻辑:根据发送结果执行本地事务,如果写入失败,此时half消息对业务不可见,本地事务逻辑不执行。
  4. 提交二次确认结果:根据本地事务状态执行Commit或者Rollback。RocketMQ 的事务消息分为3种状态,分别是提交状态、回滚状态、未知状态。**TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。TransactionStatus.Unknown: **未知状态,它代表需要检查消息队列来确定状态。
  5. 消息回查:(1) 对没有Commit/Rollback的事务消息,从服务端发起一次回查 (2) Producer收到回查消息,检查回查消息对应的本地事务的状态 (3) 根据本地事务状态,重新Commit或者Rollback。第一次回查后仍未获取到事务状态,则之后每隔30s会再次回查,最多重试15次,超过了就会默认丢弃此消息。

1.2 RocketMQ订单支付功能设计

数据库设计

  1. /*
  2. SQLyog Community v13.2.0 (64 bit)
  3. MySQL - 8.0.33 : Database - shop
  4. *********************************************************************
  5. */
  6. /*!40101 SET NAMES utf8 */;
  7. /*!40101 SET SQL_MODE=''*/;
  8. /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
  9. /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
  10. /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
  11. /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  12. CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
  13. USE `shop`;
  14. /*Table structure for table `shop_order` */
  15. DROP TABLE IF EXISTS `shop_order`;
  16. CREATE TABLE `shop_order` (
  17. `id` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL COMMENT '订单id',
  18. `total_num` INT DEFAULT NULL COMMENT '数量合计',
  19. `moneys` INT DEFAULT NULL COMMENT '金额合计',
  20. `pay_type` VARCHAR(1) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '支付类型,1、在线支付、0 货到付款',
  21. `create_time` DATETIME DEFAULT NULL COMMENT '订单创建时间',
  22. `update_time` DATETIME DEFAULT NULL COMMENT '订单更新时间',
  23. `pay_time` DATETIME DEFAULT NULL COMMENT '付款时间',
  24. `consign_time` DATETIME DEFAULT NULL COMMENT '发货时间',
  25. `end_time` DATETIME DEFAULT NULL COMMENT '交易完成时间',
  26. `username` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '用户名称',
  27. `recipients` VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人',
  28. `recipients_mobile` VARCHAR(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人手机',
  29. `recipients_address` VARCHAR(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '收货人地址',
  30. `weixin_transaction_id` VARCHAR(30) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT '交易流水号',
  31. `order_status` INT DEFAULT NULL COMMENT '订单状态,0:未完成,1:已完成,2:已退货',
  32. `pay_status` INT DEFAULT NULL COMMENT '支付状态,0:未支付,1:已支付,2:支付失败',
  33. `is_delete` INT DEFAULT NULL COMMENT '是否删除',
  34. PRIMARY KEY (`id`),
  35. KEY `create_time` (`create_time`),
  36. KEY `status` (`order_status`),
  37. KEY `payment_type` (`pay_type`)
  38. ) ENGINE=INNODB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_bin;
  39. /*Data for the table `shop_order` */
  40. /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
  41. /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
  42. /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
  43. /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

添加RocketMQ依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. </dependency>

bootstrap.yaml配置

  1. server:
  2. port: 8085
  3. spring:
  4. application:
  5. name: mall-order
  6. datasource:
  7. driver-class-name: com.mysql.cj.jdbc.Driver
  8. url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
  9. username: root
  10. password: 123456
  11. cloud:
  12. nacos:
  13. config:
  14. file-extension: yaml
  15. server-addr: localhost:8848
  16. discovery:
  17. #Nacos的注册地址
  18. server-addr: localhost:8848
  19. rocketmq:
  20. name-server: localhost:9876
  21. producer:
  22. group: test-group-producer

Service层

  1. public interface OrderService extends IService<Order> {
  2. //添加订单
  3. void add(Order order);
  4. //修改订单支付状态
  5. void pay(String id);
  6. }
  7. @Service
  8. public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order>
  9. implements OrderService{
  10. @Autowired
  11. OrderMapper orderMapper;
  12. @Transactional(rollbackFor = Exception.class)
  13. @Override
  14. public void add(Order order) {
  15. order.setCreateTime(new Date());
  16. orderMapper.insert(order); //这里仅仅生成订单,还有扣减库存等等一系列操作省略
  17. }
  18. @Transactional(rollbackFor = Exception.class)
  19. @Override
  20. public void pay(String id) {
  21. //模拟支付完成,修改订单的支付状态
  22. Order order = orderMapper.selectById(id);
  23. order.setPayStatus(1);
  24. order.setPayTime(new Date());
  25. orderMapper.updateById(order);
  26. }
  27. }

创建生产者

  1. @RestController
  2. @Slf4j
  3. public class TestController {
  4. @Autowired
  5. OrderMapper orderMapper;
  6. @Autowired
  7. RocketMQTemplate rocketMQTemplate;
  8. @RequestMapping("/send")
  9. public String send(){
  10. String id = UUID.randomUUID().toString();
  11. String msg = "订单"+id+"支付成功";
  12. Order order=new Order();
  13. order.setId(id);
  14. order.setCreateTime(new Date());
  15. order.setMoneys(100);
  16. order.setUsername("张三");
  17. Message<String> message = MessageBuilder.withPayload(msg).setHeader("key",id).build();
  18. TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order", message, order);
  19. String transactionId = result.getTransactionId();
  20. String status = result.getSendStatus().name();
  21. log.info("发送消息成功 transactionId={} status={} ",transactionId,status);
  22. return "success";
  23. }
  24. }

创建消费者

  1. @Component
  2. @Slf4j
  3. @RocketMQMessageListener(consumerGroup = "test-consumer",topic = "order",messageModel = MessageModel.CLUSTERING)
  4. public class RocketMQListen implements RocketMQListener<MessageExt> {
  5. @Override
  6. public void onMessage(MessageExt messageExt) {
  7. String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
  8. System.out.println(body);
  9. }
  10. }

生产者消息监听器

  1. @Component
  2. @RocketMQTransactionListener
  3. public class TransactionMsgListener implements RocketMQLocalTransactionListener {
  4. @Autowired
  5. OrderService orderService;
  6. @Override
  7. public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
  8. Order order = (Order) o;
  9. try {
  10. //生成订单
  11. orderService.add(order);
  12. return RocketMQLocalTransactionState.UNKNOWN;
  13. }catch (Throwable throwable){
  14. throwable.printStackTrace();
  15. return RocketMQLocalTransactionState.ROLLBACK;
  16. }
  17. }
  18. @Override
  19. public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
  20. String key = message.getHeaders().get("key").toString();
  21. System.out.println("回查订单id "+key+" 回查时间"+new Date());
  22. Order order = orderService.getById(key);
  23. if(order!=null) {
  24. long l = new Date().getTime() - order.getCreateTime().getTime();
  25. long time = l / (1000 * 60);
  26. //超时1分钟后,就会把未支付的订单进行删除
  27. if (time > 1) {
  28. orderService.removeById(key);
  29. System.out.println("订单" + key + "删除");
  30. //订单,库存等一系列操作
  31. return RocketMQLocalTransactionState.ROLLBACK;
  32. }
  33. Integer payStatus = order.getPayStatus();
  34. if (payStatus == 1) {
  35. return RocketMQLocalTransactionState.COMMIT;
  36. }
  37. return RocketMQLocalTransactionState.UNKNOWN;
  38. }
  39. else
  40. return RocketMQLocalTransactionState.ROLLBACK;
  41. }
  42. }

测试

这里通过生产者发送五个事务消息,生成五个订单,然后两个订单在一分钟内修改支付状态为已支付,超时一分钟未支付就会删除订单回退。运行截图如下:


本文转载自: https://blog.csdn.net/qq_43649937/article/details/135461751
版权归原作者 山河亦问安 所有, 如有侵权,请联系我们删除。

“Springboot+RocketMQ通过事务消息优雅的实现订单支付功能”的评论:

还没有评论