0


Seata 入门与实战

一、什么是 Seata

Seata 是一款开源的分布式事务解决方式,致力于提供高性能和简单易用的分布式事务服务。Seata 为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式事务解决方案。

二、Seata 组成

  1. 事务协调者(Transaction Coordinator):简称 TC,它就是 Seata 服务端,负责协调并管理分布式事务的执行。它是分布式事务的协调者,协调多个参与者的事务操作。事务协调者负责全局的创建、提交、回滚以及事务的状态管理。它通过全局事务ID来追踪和协调分支事务的执行。
  2. 事务管理器(Transaction Manager):简称 TM,负责管理应用程序的本地事务(分支事务)。事务管理器定义了全局事务的范围,负责将分支事务注册到全局事务中,并在全局事务的协调下,执行本地事务的提交和回滚。
  3. 资源管理器(Reaource Manager):简称 RM,管理分支事务处理的资源,注册分支事务和报告分支事务的状态,并驱动分支事务提交和回滚。

三、Seata 模式介绍

3.1 XA 模式

概念:性能相比其他事务要差一点,但能保证最严格的数据一致性。XA 模式需要设置串行化隔离级别,相当于对数据添加了读写锁。另外连接资源需要在整个事务期间保持,这样可能会导致资源锁定问题,从而影响并发事务吞吐。

优点:实现简单、无业务侵入

缺点:性能差、必须实现 XA 协议、容易产生死锁。

适用场景:隔离级别要求高,强一致性分阶段事务模型,牺牲了一定的可用性(保证了强一致性)

3.2 AT 模式(默认模式)

概念:解决了 XA 模式中锁占用时间过长的问题。它的主要特点是简单无侵入,强一致,学习成本低。缺点是需要遵守一定的开发规约,它并不是对所有的 SQL 类型都支持,有一定的使用限制。适用于通用的业务场景,但它并不适用与热点数据的高并发场景,像 SKU(Stock Keeping Unit,库存管理的一个概念,用来唯一标识一个可售卖的商品)的库存的扣减。因为 AT 模式有应用层的全局锁,当需要对相同数据修改操作时,需要使用全局锁控制事务的并发时序,实现上存在锁排队的机制

优点:简单、无业务侵入、事务强一致性、学习成本低、性能高于XA模式

缺点:需要数据库的支持、遵循一定的开发规范(Java应用、通过JDBC访问数据库)

适用场景:不适合用于连接的数据库不支持(本地事务)

3.3 TCC 模式

业务层面的分布式事务解决方案,TCC,Try-Confirm-Cancel 模式,是一种通过三个步骤(Try、Confirm、Cancel)来实现分布式事务的模式。在TCC模式下,应用程序需要自行实现Try、Confirm、Cancel 三个方法,分别标志尝试执行、确认提交和取消回滚。TCC模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用自定义的 prepare 逻辑。
  • 二阶段 commit 行为:调用自定义的 commit 逻辑。
  • 三阶段 rollback 行为:调用自定义的 rollback 逻辑。

优点:灵活、性能高(不依赖全局锁、不需要生产快照)、不依赖数据库事务。

缺点:业务侵入大、实现难度高、事务最终一致性(不是强一致性)。

使用场景:连接的数据库不支持(本地)事务。

3.4 SAGA 模式

业务层面的分布式解决方案。一个基于长事务的解决方案,Saga 模式解决的是在没有二阶段提交的情况下解决分布式事务。它的核心思想是将一个业务流程中的长事务拆分成多个本地短事务,当其中一个参与者的事务执行失败,则通过补偿机制补偿给前面已经执行成功的参与者。

优点:灵活、性能高、可异步化

缺点:无锁、不保证隔离性、业务侵入大

使用场景:长事务。

四、XA 协议

4.1 什么是 XA 协议

XA 协议是一种标准的分布式事务协议,用于实现跨多个资源管理器(如数据库)的分布式事务的一致性。

XA 协议定义了在分布式环境下多个资源(如数据库)之间进行事务协作的规范和接口。它使得每个资源管理器都能参与到全局事务中,并根据全局事务的指令进行相应的操作。XA 协议的核心是两阶段提交(Two-Phase Commit ,2PC)机制:

1. 第一阶段(准备阶段):全局事务协调者向所有参与者发送准备请求,每个参与者执行本地事务并返回确认信息个协调者。如果所有参与者都返回成功,那么协调器进入第二阶段。如果任何一个参与者返回失败,协调者将发送回滚请求。

2. 第二阶段(提交/回滚阶段):全局事务协调者根据第一阶段的反馈情况,决定提交或回滚全局事务。首先,协调者发送提交或回滚指令给所有参与者,然后参与者按照指令执行相应的操作。

通过两阶段提交,XA 协议保证了分布式事务的原子性和一致性。每个参与者在第一阶段确认准备之后,就无法单独执行本地事务的提交或回滚操作,而是依赖于全局事务协调的指令。

XA 协议还定义了一些其他的接口和规范,如请求资源管理器的参与、注册、和卸载等。它提供了一种标准化的方法,使得分布式环境下的多个资源管理器能够按照统一的协议进行事务的协调和提交。

4.2 XA 模式为何要将事务设置为串行化

XA 协议要求在一个事务中的所有资源都要参与到全局事务中,并且按照一定的顺序进行协调和提交。在分布式环境下,各个资源管理器(如数据库)之间的通信和协调需要依赖于协调器(如Seata)。

为了保证多个资源管理器之间的事务顺序性和一致性,需要将事务设置为串行化。这是因为当多个事务并发执行时,如果没有串行执行的限制,可能会导致事务的顺序和预期不一致,从而导致数据的不一致和事务的错误。

通过将事务设置为串行化,可以保证事务的执行顺序和一致性,防止并发执行时出现数据冲突或不可预期的结果。串行执行可以确保每个事务按照特定的顺序进行,并且在一个事务提交或回滚后再执行下一个事务,避免了并发带来的问题。

五、Seata 服务部署(搭建TC服务)

下载地址:github.com

解压之后的目录:

5.1 修改配置文件

server:
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${user.home}/logs/seata
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata
seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace:
      group: DEFAULT_GROUP
      username: nacos
      password: nacos
      context-path:
      ##if use MSE Nacos with auth, mutex with username/password attribute
      #access-key:
      #secret-key:
      data-id: seataServer.properties #Seata 再配置中心的名字
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      namespace:
      cluster: default
      username: nacos
      password: nacos
      context-path:
  store:
    # support: file 、 db 、 redis
    mode: db
    session:
      mode: db
    lock:
      mode: db
    db:
      datasource: druid
      db-type: mysql
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true
      user: root
      password: 123456
      min-conn: 10
      max-conn: 100
      global-table: global_table
      branch-table: branch_table
      lock-table: lock_table
      distributed-lock-table: distributed_lock
      query-limit: 1000
      max-wait: 5000
#  server:
#    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login

以上内容时从 seata/conf/application.example.yml 修改而来的,如下图所示:

5.2 Nacos 中新增 Seata 配置

5.3 MySql 中新建 Seata 数据库

复制 seata/script/server/db/mysal.sql 脚本,再本地 MySQL 数据库中执行,脚本存放目录如下:

包含表:

其中:

  • global_table:全局事务表。
  • branch_table:分支事务表。
  • lock_table: 全局事务表
  • distributed_lock::分支事务表(多 Server 集群下保证同时只有一个 Server 处理提交或回滚)

5.4 进入 seata/bin 目录,在 Windows 环境中只需要执行 seata-server.bat,即可启动 Seata 服务

启动成功后使用 http://localhost:7091 就可以访问 Seata 控制台。

六、Seata 使用

6.1 实现XA 模式

TM 通知 TC开启事务,调用分支事务,将分支事务注册到TC 中,然后执行本地 SQL ,完毕将本地事务状态报告给 TC ,当所有本地事务执行完毕后,TM 会通知 TC 进行 提交/回滚事务,TC 检查完分支事务状态后,就会提交/回滚事务。

6.1.1 添加 Seta 依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

6.1.2 配置 Seta 信息

seata:
  application-id: seata-service-business
  registry:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: ""
      group: SEATA_GROUP
      application: seata-server #Nacos 中 Seata 名称
      username: nacos
      password: nacos
  tx-service-group: seata_tx_group #事务组(被一个集群管理)
  service:
    vgroup-mapping:
      seata_tx_group: default
  data-source-proxy-mode: XA #运行模式

6.1.3 开启分布式锁

使用 @GlobalTransactional 注解开启分布式事务,如下所示:

@GlobalTransactional
    public void purchase(String userId,
                         String commodityCode,
                         Integer orderCount) {
        // 添加订单
        orderService.create(userId, commodityCode, orderCount);
        // 减库存
        storageService.deduct(commodityCode, orderCount);
    }

6.2 实现 AT 模式

AT 模式执行流程:

AT模式的实现和 XA 模式的步骤完全类似,但多了以下两步:

  1. 在数据库中添加 undo_log 事务回滚表。

  2. 设置 Seata 运行模式为 AT 模式(或者删除运行模式,因为 Seata 默认的就是 AT 模式)。

6.2.1 新增 undo_log 表

在业务系统中(TM 事务管理器的项目中),添加 undo_log 表,实现 SQL 如下:

CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
    ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);

6.2.1 设置 AT 模式

seata:
  application-id: seata-service-business
  registry:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: ""
      group: SEATA_GROUP
      application: seata-server #Nacos 中 Seata 名称
      username: nacos
      password: nacos
  tx-service-group: seata_tx_group #事务组(被一个集群管理)
  service:
    vgroup-mapping:
      seata_tx_group: default
  data-source-proxy-mode: AT

6.3 TCC 模式

以仓库服务为例,将仓库服务设置为 TCC 模式,它的实现步骤如下:

  1. 新建(仓库)冻结表
  2. 新建 TCC 接口,声明 TCC 调用方法
  3. 新建 TCC 实现类,实现接口中的方法
  4. 将业务调用的方法更改为 TCC 实现类的方法。

6.3.1 新建冻结表

将仓库设置为 TCC 模式,因此我们需要创建仓库的冻结表(冻结表一般要有状态变化的,例如仓库的库存量可以进行加减,但向订单表这种没有上下文状态的就不适合作为冻结表)

use seata_demo;
create table storage_freeze(
    xid varchar(128) NOT NULL primary key,
    commodity_code varchar(256) DEFAULT NULL COMMENT '货物编码(唯一)',
    freeze_count int(11) unsigned DEFAULT '0' comment '冻结数量',
    state int(11) DEFAULT NULL COMMENT '事务状态0:try,1:confirm,2:cancel'
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

6.3.2 创建 TCC 接口

@LocalTCC
public interface TCCService{

    @TwoPhaseBusinessAction(name = "deduct",commitMethod = "commit",rollbackMethod = "cancel")
    void decuct(@BusinessActionContextParameter(paramName = "commodityCode")String commodityCode,
                @BusinessActionContextParameter(paramName = "orderCount")Integer orderCount);

    /**
     * 二阶段提交方法
     * @param context
     * @return
     */
    boolean confirm(BusinessActionContext context);

    /**
     * 二阶段回滚方法
     * @param context
     * @return
     */
    boolean cancle(BusinessActionContext context);
}

6.3.3 新建 TCC 实现类

实现思路分析,TCC 每个方法中实现的业务逻辑:

1. Try 中实现逻辑:

  • 扣减仓库表库存数量
  • 仓库冻结表中,添加冻结数据

2. Confirm 中实现逻辑:

  • 根据XID(事务ID)删除冻结表中的数据。

3. Cancel 中实现逻辑:

  • 修改仓库表,恢复冻结数量
  • 修改冻结表的状态为2,冻结数量为 0
@Service
public class TCCServiceImp implements TCCService {
    @Autowired
    StorageFreezeMapper storageFreezeMapper;

    @Autowired
    StorageMapper storageMapper;

    @Override
    @Transactional
    public void deduct(String commodityCode, Integer orderCount) {
        //获取全局事务 ID
        String xid= RootContext.getXID();
        //仓库表扣减数据
        storageMapper.deduct(commodityCode,orderCount);
        //冻结表中添加冻结数据
        StorageFreeze storageFreeze=new StorageFreeze();
        storageFreeze.setXid(xid);
        storageFreeze.setCommodityCode(commodityCode);
        storageFreeze.setFreezeCount(orderCount);
        storageFreeze.setState(0);
        storageFreezeMapper.insert(storageFreeze);
    }

    @Override
    public boolean confirm(BusinessActionContext context) {
        String xid=context.getXid();
        int result=storageFreezeMapper.deleteById(xid);
        return result==1;
    }

    @Override
    public boolean cancel(BusinessActionContext context) {
        String xid=context.getXid();
        StorageFreeze storageFreeze=storageFreezeMapper.getByid(xid);
        if(storageFreeze==null){
            return true;
        }
        //修改仓库表,恢复可用数量
        storageMapper.recover(storageFreeze.getCommodityCode(),storageFreeze.getFreezeCount());
        //修改冻结表状态为2,冻结数量为0
        storageFreeze.setState(2);
        storageFreeze.setFreezeCount(0);
        int result=storageFreezeMapper.updateById(xid);
        return result==1;
    }
}

6.3.4 更换调用方法

在接口出,将减库存的方法更换为 TCCService 的方法:

    @Autowired
    private TCCService tccService;

    @RequestMapping("/deduct")
    public void deduct(@RequestParam String commodityCode,
                       @RequestParam Integer count) {
        tccService.deduct(commodityCode, count);
    }

seata 提供的集中分布式事务方式可以混用,上述示例中 order 表就是使用的AT模式,而storage 使用的时 TCC 模式

6.4 Sage 模式

Saga 模式是 SEATA 提供的长事务解决方案,在 Saga 模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者(回滚已经提交的事务)。

七、注意事项

  1. 业务表中必须包含单列主键,若存在复合主键,目前支持 mysql,oracle,pgsql,mariadb,其他类型数据库建议先建一列自增id 主键,原复合主键改成唯一键来规避。
  2. 每个业务库中必须包含 undo_log 表,若与分库分表组件联用,分库不分表。
  3. 使用注解开启分布式事务时,若默认服务 provider 端加入 consumer 端的事务,provider 可不标注注解,但是,provider 同样需要相应的依赖和配置,仅可省略注解。
  4. 使用注解开启分布式事务时,若要求事务回滚,必须将异常抛出到事务的发起方,被事务发起方发的 @GlobalTransactional 注解感知到。

本文转载自: https://blog.csdn.net/weixin_70564024/article/details/140702905
版权归原作者 秃头的赌徒 所有, 如有侵权,请联系我们删除。

“Seata 入门与实战”的评论:

还没有评论