0


RabbitMQ 实战:电商下单流程

使用 RabbitMQ 实现异步处理与服务解耦

一、RabbitMQ 实战场景

1.1 场景描述

在电商平台中,用户下单时需要完成多个操作,例如扣除库存、创建订单、预扣除优惠券和积分、通知商家等。整个流程需要高效且可靠的处理。

1.2 场景搭建

为了实现该场景,需要搭建多个微服务,并使用 RabbitMQ 进行异步消息处理。

1.2.1 构建聚合工程

创建一个父工程来管理所有子模块,使用 Maven 作为构建工具。

xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath />
</parent>

<groupId>com.mashibing</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>

<properties>
    <spring.cloud-version>Hoxton.SR12</spring.cloud-version>
    <spring.cloud.alibaba-version>2.2.7.RELEASE</spring.cloud.alibaba-version>
</properties>

技术与功能:使用 Maven 作为构建工具,管理项目的依赖和模块。设置 Spring Boot 作为基础框架,以便于快速开发微服务。

1.2.2 创建子服务

创建六个子服务,分别处理下单、库存、订单、优惠券、用户积分和商家通知。

二、下单服务的实现

2.1 导入依赖

在下单服务的

pom.xml

中添加必要的依赖。

xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
</dependencies>

技术与功能:引入 Spring Boot Web、Nacos 服务发现和 OpenFeign 依赖,实现微服务的 RESTful 接口以及服务间的调用。

2.2 构建启动类

创建下单服务的启动类,并启用服务发现与 Feign 客户端。

java

@SpringBootApplication
@EnableDiscoveryClient // 启用服务发现
@EnableFeignClients // 启用 OpenFeign
public class PlaceOrderStarterApp {
    public static void main(String[] args) {
        SpringApplication.run(PlaceOrderStarterApp.class, args); // 启动应用
    }
}

技术与功能:使用 Spring Boot 启动应用,

@EnableDiscoveryClient

注解使服务能够注册到 Nacos,

@EnableFeignClients

使得可以通过 Feign 进行服务调用。

2.3 配置文件

application.yml

中配置服务信息。

yaml

server:
  port: 80 // 设置服务端口

spring:
  application:
    name: placeorder // 设置服务名称
  cloud:
    nacos:
      discovery:
        server-addr: 114.116.226.76:8848 // Nacos 服务地址

技术与功能:配置 Spring Boot 应用的基本信息,包括服务名称和 Nacos 服务发现地址。

2.4 下单接口实现

在下单服务中实现下单接口。

java

@RestController
public class PlaceOrderController {

    @Autowired
    private ItemStockClient itemStockClient; // 注入库存服务客户端
    @Autowired
    private OrderManageClient orderManageClient; // 注入订单服务客户端
    @Autowired
    private CouponClient couponClient; // 注入优惠券服务客户端
    @Autowired
    private UserPointsClient userPointsClient; // 注入用户积分服务客户端
    @Autowired
    private BusinessClient businessClient; // 注入商家通知服务客户端

    @GetMapping("/po") // 定义下单接口
    public String po() {
        long start = System.currentTimeMillis(); // 记录开始时间
        // 调用各个服务进行处理
        itemStockClient.decr(); // 扣除库存
        orderManageClient.create(); // 创建订单
        couponClient.coupon(); // 预扣除优惠券
        userPointsClient.up(); // 预扣除用户积分
        businessClient.notifyBusiness(); // 通知商家

        long end = System.currentTimeMillis(); // 记录结束时间
        System.out.println(end - start); // 打印耗时
        return "place order is ok!"; // 返回成功信息
    }
}

技术与功能:使用 Spring MVC 的

@RestController

注解实现 RESTful API,调用其他微服务的接口来完成下单操作。通过依赖注入来调用不同的服务。

三、实现异步调用

3.1 下单服务配置 RabbitMQ

在下单服务中引入 RabbitMQ 的依赖,并配置 RabbitMQ 连接信息。

xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId> <!-- 引入 RabbitMQ 依赖 -->
</dependency>

yaml

spring:
  rabbitmq:
    host: 114.116.226.76 // RabbitMQ 服务器地址
    port: 5672 // RabbitMQ 端口
    username: rabbitmq // RabbitMQ 用户名
    password: rabbitmq // RabbitMQ 密码
    virtual-host: rabbitmq // RabbitMQ 虚拟主机

技术与功能:引入 RabbitMQ 的 Spring Boot Starter,配置 RabbitMQ 连接信息,使下单服务能够与 RabbitMQ 进行通信。

3.2 构建交换机和队列

在 RabbitMQ 配置类中创建交换机和队列。

java

@Configuration
public class RabbitMQConfig {

    public static final String PLACE_ORDER_EXCHANGE = "place_order_exchange"; // 下单服务的交换机
    public static final String COUPON_QUEUE = "coupon_queue"; // 优惠券服务的队列
    public static final String USER_POINTS_QUEUE = "user_points_queue"; // 用户积分服务的队列
    public static final String BUSINESS_QUEUE = "business_queue"; // 商家通知服务的队列

    @Bean
    public Exchange placeOrderExchange() {
        return ExchangeBuilder.fanoutExchange(PLACE_ORDER_EXCHANGE).build(); // 创建 Fanout 类型的交换机
    }

    @Bean
    public Queue couponQueue() {
        return QueueBuilder.durable(COUPON_QUEUE).build(); // 创建持久化的优惠券队列
    }

    @Bean
    public Queue userPointsQueue() {
        return QueueBuilder.durable(USER_POINTS_QUEUE).build(); // 创建持久化的用户积分队列
    }

    @Bean
    public Queue businessQueue() {
        return QueueBuilder.durable(BUSINESS_QUEUE).build(); // 创建持久化的商家通知队列
    }

    @Bean
    public Binding couponBinding(Exchange placeOrderExchange, Queue couponQueue) {
        return BindingBuilder.bind(couponQueue).to(placeOrderExchange).with("").noargs(); // 绑定优惠券队列到交换机
    }

    @Bean
    public Binding userPointsBinding(Exchange placeOrderExchange, Queue userPointsQueue) {
        return BindingBuilder.bind(userPointsQueue).to(placeOrderExchange).with("").noargs(); // 绑定用户积分队列到交换机
    }

    @Bean
    public Binding businessBinding(Exchange placeOrderExchange, Queue businessQueue) {
        return BindingBuilder.bind(businessQueue).to(placeOrderExchange).with("").noargs(); // 绑定商家通知队列到交换机
    }
}

技术与功能:使用 RabbitMQ 的 Spring Boot 提供的 API 创建交换机和队列,并设置它们之间的绑定关系。使用 Fanout 类型的交换机可以将消息广播到所有绑定的队列中。

3.3 修改下单接口

在下单接口中,将同步调用改为异步调用。

java

@GetMapping("/po")
public String po() {
    long start = System.currentTimeMillis(); // 记录开始时间
    itemStockClient.decr(); // 扣除库存
    orderManageClient.create(); // 创建订单

    String userAndOrderInfo = "用户信息&订单信息&优惠券信息等等…………"; // 准备消息内容
    // 发送消息到 RabbitMQ
    rabbitTemplate.convertAndSend(RabbitMQConfig.PLACE_ORDER_EXCHANGE, "", userAndOrderInfo); // 发送异步消息

    long end = System.currentTimeMillis(); // 记录结束时间
    System.out.println(end - start); // 打印耗时
    return "place order is ok!"; // 返回成功信息
}

技术与功能:通过 RabbitTemplate 实现消息的异步发送,将下单相关的信息发送到 RabbitMQ,其他服务可以通过监听相应的队列来处理这些消息,从而实现解耦。

四、其他服务的实现

其他服务(优惠券、用户积分、商家通知)也需要配置 RabbitMQ,并实现对应的监听器。

4.1 优惠券服务监听器

java

@Component
public class CouponListener {

    @RabbitListener(queues = {RabbitMQConfig.COUPON_QUEUE}) // 监听优惠券队列
    public void consume(String msg, Channel channel, Message message) throws Exception {
        // 预扣除优惠券
        Thread.sleep(400); // 模拟处理时间
        System.out.println("优惠券预扣除成功!" + msg); // 打印成功信息
        // 手动ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息已消费
    }
}

技术与功能:使用

@RabbitListener

注解监听 RabbitMQ 中的优惠券队列。当接收到消息时,执行预扣除优惠券的逻辑,并手动确认消息已消费。通过这种方式,确保消息的消费过程是可控的。

4.2 用户积分服务和商家服务

用户积分服务和商家服务的实现类似于优惠券服务,注意监听对应的队列。

java

@Component
public class UserPointsListener {

    @RabbitListener(queues = {RabbitMQConfig.USER_POINTS_QUEUE}) // 监听用户积分队列
    public void consume(String msg, Channel channel, Message message) throws Exception {
        // 预扣除用户积分
        Thread.sleep(400); // 模拟处理时间
        System.out.println("用户积分预扣除成功!" + msg); // 打印成功信息
        // 手动ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息已消费
    }
}

@Component
public class BusinessListener {

    @RabbitListener(queues = {RabbitMQConfig.BUSINESS_QUEUE}) // 监听商家通知队列
    public void consume(String msg, Channel channel, Message message) throws Exception {
        // 通知商家
        Thread.sleep(400); // 模拟处理时间
        System.out.println("通知商家成功!" + msg); // 打印成功信息
        // 手动ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息已消费
    }
}

技术与功能:通过 RabbitMQ 的监听机制,分别处理用户积分和商家通知的消息。每个服务都能够独立处理自己的业务逻辑,从而实现微服务的解耦。

五、确保消息可靠性

为了确保消息能够可靠地发送到 RabbitMQ,我们需要实现以下功能:

5.1 配置 RabbitTemplate

在 RabbitTemplate 中配置确认和返回的回调处理。

java

@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(); // 创建 RabbitTemplate 对象
        rabbitTemplate.setConnectionFactory(connectionFactory); // 设置连接工厂
        rabbitTemplate.setConfirmCallback(confirmCallback()); // 设置确认回调
        rabbitTemplate.setReturnCallback(returnCallback()); // 设置返回回调
        rabbitTemplate.setMandatory(true); // 设置为强制模式
        return rabbitTemplate; // 返回 RabbitTemplate 对象
    }

    public RabbitTemplate.ConfirmCallback confirmCallback() {
        return (correlationData, ack, cause) -> { // 确认回调
            if (correlationData == null) return; // 如果没有 correlationData,直接返回
            String msgId = correlationData.getId(); // 获取消息 ID
            if (ack) {
                System.out.println("消息发送到Exchange成功!! msgId = " + msgId); // 打印成功信息
                GlobalCache.remove(msgId); // 从全局缓存中移除消息
            } else {
                System.out.println("消息发送到Exchange失败!! msgId = " + msgId); // 打印失败信息
                // 记录失败消息,进行重试或存储
            }
        };
    }

    public RabbitTemplate.ReturnCallback returnCallback() {
        return (message, replyCode, replyText, exchange, routingKey) -> { // 返回回调
            System.out.println("消息未路由到队列"); // 打印未路由信息
            System.out.println("return:消息为:" + new String(message.getBody())); // 打印消息内容
            System.out.println("return:交换机为:" + exchange); // 打印交换机信息
            System.out.println("return:路由为:" + routingKey); // 打印路由信息
        };
    }
}

技术与功能:配置 RabbitTemplate 的确认和返回回调,以确保每条消息都能被成功发送到 RabbitMQ。通过回调机制,可以在消息发送失败时进行相应的处理。

5.2 消息发送时绑定 ID

在下单服务中,将消息 ID 绑定到全局缓存中,确保在确认失败时能够进行相应处理。

java

public class GlobalCache {

    private static Map<String, Object> map = new HashMap<>(); // 存储全局缓存

    public static void set(String key, Object value) {
        map.put(key, value); // 设置缓存
    }

    public static Object get(String key) {
        return map.get(key); // 获取缓存
    }

    public static void remove(String key) {
        map.remove(key); // 移除缓存
    }
}

技术与功能:使用全局缓存来存储消息 ID 和相关信息。确保在消息发送过程中,如果发生错误,可以根据 ID 进行重试或记录。

六、消费者避免重复消费

为了避免消费者重复消费,我们可以使用数据库的幂等性表。

6.1 准备幂等表

sql

CREATE TABLE `user_points_idempotent` (
  `id` varchar(255) NOT NULL,
  `createtime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; // 创建幂等表

技术与功能:创建数据库表用于存储已消费的消息 ID,确保每条消息只被处理一次。

6.2 实现幂等性检查

在用户积分服务中实现幂等性检查,避免重复消费。

java

@Service
@Slf4j
public class UserPointsConsumeImpl implements UserPointsConsume {

    @Resource
    private UserPointsIdempotentMapper userPointsIdempotentMapper; // 注入 Mapper

    @Override
    @Transactional
    public void consume(Message message) {
        String id = message.getMessageProperties().getHeader("spring_returned_message_correlation"); // 获取消息头中的 ID
        int count = userPointsIdempotentMapper.findById(id); // 查询幂等表
        if (count == 1) { // 如果存在,直接返回
            log.info("消息已经被消费!!!无需重复消费!");
            return;
        }
        userPointsIdempotentMapper.save(id); // 插入消息标识到幂等表
        // 执行消费逻辑
        System.out.println("扣除用户积分成功!!"); // 打印成功信息
    }
}

技术与功能:通过查询幂等表来判断消息是否已经消费,如果未消费则进行处理,并将 ID 插入幂等表中。这样可以防止重复消费,确保业务逻辑的正确性。

七、实现延迟取消订单状态

7.1 准备订单表

创建订单表并实现订单状态的管理。

sql

CREATE TABLE `tb_order` (
  `id` varchar(36) NOT NULL AUTO_INCREMENT,
  `total` decimal(10,2) DEFAULT NULL,
  `order_state` int(11) DEFAULT '0' COMMENT '订单状态 0-待支付, 1-已支付,2-待发货,3-已发货,-1-已取消',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; // 创建订单表

技术与功能:创建订单表用于存储订单信息,包括订单状态。通过状态管理,能够实现订单的生命周期管理。

7.2 实现订单状态管理

在订单服务中实现订单状态的修改逻辑,并将消息发送到死信队列。

java

@Service
public class TBOrderServiceImpl implements TBOrderService {

    @Resource
    private TBOrderMapper orderMapper; // 注入 Mapper

    @Autowired
    private RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate

    @Override
    @Transactional
    public void save() {
        // 生成主键ID
        String id = UUID.randomUUID().toString();
        // 创建订单
        orderMapper.save(id);
        // 订单构建成功~
        // 发送消息到RabbitMQ的死信队列
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "", id, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息的生存时间为15s,当然,也可以在构建队列时,指定队列的生存时间。
                message.getMessageProperties().setExpiration("15000");
                return message; // 返回消息
            }
        });
    }
}

技术与功能:在订单创建成功后,将订单 ID 发送到 RabbitMQ 的死信队列,并设置消息的生存时间。通过死信队列,可以在超时后执行取消订单的逻辑。

7.3 声明消费者处理延迟取消订单

实现消费者,处理死信队列中的消息,取消未支付的订单。

java

@Component
public class DelayMessageListener {

    @Autowired
    private TBOrderService orderService; // 注入订单服务

    @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) // 监听死信队列
    public void consume(String id, Channel channel, Message message) throws IOException {
        orderService.delayCancelOrder(id); // 调用服务取消订单
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息已消费
    }
}

技术与功能:使用 RabbitMQ 的监听机制处理死信队列中的消息,执行取消订单的逻辑。通过这种方式,能够在订单超时未支付时,自动取消订单。

总结

通过以上步骤,我们实现了一个基于 RabbitMQ 的电商下单流程,展示了如何通过消息队列实现异步处理与服务之间的解耦。同时,采用幂等性设计和消息可靠性机制,确保了系统的稳定性与可靠性。这种架构能够有效提高系统的响应速度,并降低系统间的耦合度,使得各个服务可以独立扩展和维护。


本文转载自: https://blog.csdn.net/weixin_51052174/article/details/141792245
版权归原作者 拾柒mm 所有, 如有侵权,请联系我们删除。

“RabbitMQ 实战:电商下单流程”的评论:

还没有评论