文章目录
一、分布式事务存在的问题
在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务最终状态一致,这样的事务就是分布式事务。
二、分布式事务理论
、CAP定理
CAP是指:
Consistency(一致性)
Availability(可用性)
Partition tolerance(分区容错性)
1、一致性
用户访问分布式系统中的任意节点,得到的数据必须是一致的。
节点1的数据一旦发生修改,节点2的数据必须进行同步,与节点1的数据保持一致。
2、可用性
用户访问集群中的任意健康节点时,必须能得到响应,而不是超时或者拒绝。
当节点3发生故障时,就会对请求进行阻塞或者拒绝,此时节点3就是不可用的。
3、分区容错性
分区:因为网络故障或其它原因导致分布式系统中的部分系欸DNA与其它节点失去连接,形成独立分区。
容错:在集群出现分区时,整个系统也要持续对外提供服务。
起初节点1、2、3是互相连接的,即任意节点数据发生变化,其余两个节点的数据也将进行同步修改,但是假设节点3与其他两个节点的网络连接断开了,但是节点本身并没有故障,就会形成新的分区,此时一旦有人对节点2的数据进行了修改,并将修改信息同步到了节点1,此时用户访问不同的节点,假设访问节点1之后我们又访问了节点3,此时拿到的结果就是不一致的。此时就不满足一致性,如果我们非要满足一致性,我们怎么办?让节点3进入短暂的禁止访问状态,等待和节点2的网络通信回复,在此期间对于到来的一切请求都进行拒绝或者阻塞,这样一来确实一致性问题得到了解决,但是我们又说了可用性,即系统健康时就必须对请求做出响应,此时我们的节点3只是和其余两个节点不通,并没有不健康,所以这就是一种悖论,我们只能在一致性和可用性之间做出抉择!
CAP定理的主要内容:
分布式系统节点通过网络连接,一定会出现分区问题(P)
当分区出现时,系统的一致性(C)和可用性(A)就无法同时满足
CP:保证了系统的一致性,但是牺牲了系统的可用性。
AP:保证了系统的可用性,但是牺牲了系统的一致性。
BASE理论
BASE理论是对CAP的一种解决思路,包含三个思想:
Basically Available (基本可用):分布式系统在出现故障时,允许损失部分可用性,即保证核心可用。
Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。
而分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论:
AP模式:各子事务分别执行和提交,允许出现结果不一致(软状态),然后采用弥补措施恢复数据即可,实现最终一致。
CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态。
解决分布式事务,各个子系统之间必须能感知到彼此的事务状态,才能保证状态一致,因此需要一个事务协调者来协调每一个事务的参与者(子系统事务)。 这里的子系统事务,称为分支事务;有关联的各个分支事务在一起称为全局事务。
三、认识Seata
Seata事务管理中有三个重要的角色:
TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
Seata分布式事务解决方案
Seata提供了四种不同的分布式事务解决方案:
XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
TCC模式:最终一致的分阶段事务模式,有业务侵入
AT模式:最终一致的分阶段事务模式,无业务侵入,也是Seata的默认模式
SAGA模式:长事务模式,有业务侵入
1、XA模式
XA 规范 是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范 描述了全局的TM与局部的RM之间的接口,几乎所有主流的数据库都对 XA 规范 提供了支持。
XA是规范,目前主流的数据库都实现了这种规范,实现的原理都是基于两阶段提交。
正常情况:
异常情况:
一阶段:
事务协调者通知每个事务参与者分别执行本地事务
本地事务执行完成后向事务协调者告知事务的执行状态,此时本地事务并没有提交,继续持有数据库锁
二阶段:
事务协调者根据一阶段的报告判断下一步操作
①、如果一阶段所有分支事务都执行成功,则告知所有事务参与者提交事务。
②、如果一阶段任意一个事务失败,则告知所有事务参与者回滚事务。
下面我们来看看Seata当中的XA模型:
RM一阶段的工作:
注册分支事务到TC
执行分支业务sql但不提交
报告执行状态给TC
TC二阶段的工作:
TC检测各分支事务的执行状态
a.如果都成功,通知所有RM提交事务
b.如果有失败,通知所有RM回滚事务
RM二阶段的工作:
接收TC的指令,提交或回滚事务。
XA模式的优缺点:
优点:
事务的强一致性,满足ACID原则。
常用数据库都支持,实现简单,并且没有代码侵入。
缺点:
因为一阶段需要锁定数据库资源,等待二阶段结束才会释放,性能较差。
依赖关系型数据库实现事务。
如何利用Seata实现XA模式:
1、修改配置文件(每个参与事务的微服务),开启XA模式
seata:
data-source-proxy-mode:XA
2、给发起全局事务的入口方法添加@GlobalTransactional注解
3、重启服务测试
2、AT模式
AT模式是对XA模式的弊端进行了完善(执行事务完并不会持续占有数据库资源)。我们来看看AT模式具体是怎么做的:
在XA模式当中,一阶段分支事务执行完事务后会等待TC的通知进行事务的提交或者事务的回滚,等待过程中会持有DB锁防止其他事务进行操作,这样会大大降低性能,在AT模式下,分支事务不需要等待TC的通知即可提交事务,但是会生成一份数据快照(undo_log),该快照记录了数据修改前和修改后的值,当TC通知分支事务进行事务提交或者回滚时,如果是提交,分支事务则仅需要删除生成的快照即可,如果是回滚,则需要根据快照当中的信息恢复数据。
AT模式下,该分支事务的执行流程如下:
一阶段:
1)TM发起并注册全局事务到TC
2)TM调用分支事务
3)分支事务准备执行业务SQL
4)RM拦截业务SQL,根据where条件查询原始数据,形成快照
5)RM执行业务SQL,提交本地事务,释放DB锁,此时money=90
6)RM报告本地事务状态给TC
二阶段:
1)TM通知TC事务结束
2)TC检查分支事务状态
a)都成功,则通知RM删除快照
b)有失败,通知RM根据快照恢复数据
流程图:
XA模式与AT模式的区别:
XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源。
XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚。
XA模式强一致;AT模式最终一致
AT模式的弊端——脏写问题
解决方案就是引入全局锁的概念,在释放DB锁之前,先要拿到全局锁,避免同一时刻有另外一个事务来操作当前持有的数据。
这个全局锁就限制了哪个事务可以操作该数据,当事务1执行业务SQL前,会先获取全局锁,TC就会进行记录,此时事务1释放DB锁去等待TC的通知(删除快照或根据快照修复数据),事务2拿到了DB锁开始执行业务SQL,当事务2尝试获取全局锁时,就会发现获取不到,因为全局锁现在被事务1所持有,此时假设恰好TC告知事务1进行数据回滚,事务1则会重新获取DB锁,但是DB锁此时被事务2所持有,这样就形成了死锁,怎么办?事务2在尝试获取一段时间全局锁后一直拿不到,就会放弃获取全局锁,此时事务1就拿到了DB锁,进行快照修复即可。前提是两个事务均是由seata控制的事务才会这样。假设事务2不是由seata控制的事务将会是怎样的?
我们上面说过,事务1记录undo_log时会记录数据修改前后的值,当我们进行数据恢复时,发现money此时为80,此时需要我们进行报警处理,人工的根据undo_log当中的数据进行数据恢复。
AT模式的优点:
一阶段完成直接提交事务,释放数据库资源,性能比较好
利用全局锁实现读写隔离
没有代码侵入,框架自动完成回滚和提交
AT模式的缺点:
两阶段之间属于软状态,属于最终一致
框架的快照功能会影响性能,但比XA模式要好很多
3、TCC模式
TCC模式与AT模式非常相似,每阶段都是独立事务,区别在于TCC是通过人工代码控制来实现数据恢复,需要实现三个方法:
Try:资源的检测和预留。
Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
Cancel:预留资源释放,可以理解为try的反向操作。
举例:一个扣减用户余额的业务。假设账户A原来余额是100,需要余额扣减30元。
阶段一( Try ):检查余额是否充足,如果充足则冻结金额增加30元,可用余额扣除30
初始余额:
此时,总金额 = 冻结金额 + 可用金额,数量依然是100不变。事务直接提交无需等待其它事务。
阶段二(Confirm):假如要提交(Confirm),则冻结金额扣减30
确认可以提交,不过之前可用金额已经扣减过了,这里只要清除冻结金额就好了:
此时,总金额 = 冻结金额 + 可用金额 = 0 + 70 = 70元
阶段二(Canncel):如果要回滚(Cancel),则冻结金额扣减30,可用余额增加30
需要回滚,那么就要释放冻结金额,恢复可用金额:
TCC模型:
可以看到,TCC模式和AT模式极其相似,但是区别在于,AT模式采用全局锁+undo_log的方式进行事务提交和回滚,这里我们不再使用锁,而是使用try-confirm-cancel的方式对操作进行记录。当TC通知分支事务可以提交的时候,分支事务直接执行confirm方法即可,通知回滚时则执行cancel方法
TCC模式的优缺点:
TCC模式的每个阶段是做什么的?
Try:资源检查和预留
Confirm:业务执行和提交
Cancel:预留资源的释放
TCC的优点是什么?
一阶段完成直接提交事务,释放数据库资源,性能好
相比AT模型,无需生成快照,无需使用全局锁,性能最强
不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库
TCC的缺点是什么?
有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦
软状态,事务是最终一致
需要考虑Confirm和Cancel的失败情况,做好幂等处理
TCC模式的两个重要问题:事务悬挂和空回滚
事务悬挂
对于已经空回滚的业务,之前被阻塞的try操作恢复,继续执行try,就永远不可能confirm或cancel ,事务一直处于中间状态,这就是业务悬挂。
执行try操作时,应当判断cancel是否已经执行过了,如果已经执行,应当阻止空回滚后的try操作,避免悬挂
空回滚
当某分支事务的try阶段阻塞时,可能导致全局事务超时而触发二阶段的cancel操作。在未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是空回滚。
执行cancel操作时,应当判断try是否已经执行,如果尚未执行,则应该空回
3、SAGA模式
原理:
在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。
分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。
Saga也分为两个阶段:
一阶段:直接提交本地事务
二阶段:成功则什么都不做;失败则通过编写补偿业务来回滚
4.SAGA模式优缺点:
优点:
事务参与者可以基于事件驱动实现异步调用,吞吐高
一阶段直接提交事务,无锁,性能好
不用编写TCC中的三个阶段,实现简单
缺点:
软状态持续时间不确定,时效性差
没有锁,没有事务隔离,会有脏写
5.四种模式对比
四、微服务整合Seata AT案例
分布式事务和传统形式的事务区别有什么?众所周知,我们采用微服务框架开发项目时,不同服务之间通过相互调用的方式完成业务处理,用以下案例来描述分布式业务存在的问题:
假设我们此时现在有一个系统,其中包括下单系统、用户系统、仓库系统。当我们进行下单时,就需要创建订单信息,同时需要对用户的余额进行扣减,还需要对仓储系统的商品数量进行减少,由于每个系统分别负责一部分的业务,且其拥有独立的数据库信息,因此我们就会有三个事务,订单创建、余额扣减、商品剩余量扣减。
Seata配置
4.1.1Seata下载
下载Seata服务端压缩包:https://github.com/seata/seata/releases
4.1.2、修改conf目录中 flie.conf 文件
修改事务日志存储模式为 db 及数据库连接信息,且新增service模块,如下:
store {
## store mode: file、db、redis
## 使用数据库
mode ="db"
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource ="druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType ="mysql"
driverClassName ="com.mysql.cj.jdbc.Driver"
## 数据库地址
url ="jdbc:mysql://localhost:3306/seata?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8"
## 数据库账号
user ="root"
## 数据库密码
password ="123456"
minConn =5
maxConn =100
globalTable ="global_table"
branchTable ="branch_table"
lockTable ="lock_table"
queryLimit =100
maxWait =5000}}
4.1.3、修改conf目录中 registry.conf文件
这里使用eureka注册中心
registry {
type ="eureka"
eureka {
serviceUrl ="http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/" #改成自己的eureka注册中心地址
application ="seata-server" #本客户端注册到eureka的微服务名称
weight ="1"}}
4.1.4、初始化seata数据库
由于我们使用了db模式存储事务日志,所以我们需要创建一个seata数据库,Seata数据库表初始化脚本:
----------------------------------The script used when storeMode is 'db'---------------------------------- the table tostoreGlobalSession data
CREATETABLEIFNOTEXISTS `global_table`
(
`xid` VARCHAR(128)NOTNULL,
`transaction_id` BIGINT,
`status` TINYINTNOTNULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,PRIMARYKEY(`xid`),KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),KEY `idx_transaction_id` (`transaction_id`))ENGINE=InnoDBDEFAULTCHARSET= utf8mb4;-- the table tostoreBranchSession data
CREATETABLEIFNOTEXISTS `branch_table`
(
`branch_id` BIGINTNOTNULL,
`xid` VARCHAR(128)NOTNULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),PRIMARYKEY(`branch_id`),KEY `idx_xid` (`xid`))ENGINE=InnoDBDEFAULTCHARSET= utf8mb4;-- the table tostore lock data
CREATETABLEIFNOTEXISTS `lock_table`
(
`row_key` VARCHAR(128)NOTNULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINTNOTNULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINTNOTNULLDEFAULT'0'COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,PRIMARYKEY(`row_key`),KEY `idx_status` (`status`),KEY `idx_branch_id` (`branch_id`),KEY `idx_xid` (`xid`))ENGINE=InnoDBDEFAULTCHARSET= utf8mb4;CREATETABLEIFNOTEXISTS `distributed_lock`
(
`lock_key` CHAR(20)NOTNULL,
`lock_value` VARCHAR(20)NOTNULL,
`expire` BIGINT,
primary key (`lock_key`))ENGINE=InnoDBDEFAULTCHARSET= utf8mb4;INSERTINTO `distributed_lock` (lock_key, lock_value, expire)VALUES('AsyncCommitting', ' ',0);INSERTINTO `distributed_lock` (lock_key, lock_value, expire)VALUES('RetryCommitting', ' ',0);INSERTINTO `distributed_lock` (lock_key, lock_value, expire)VALUES('RetryRollbacking', ' ',0);INSERTINTO `distributed_lock` (lock_key, lock_value, expire)VALUES('TxTimeoutCheck', ' ',0);DROPTABLEIFEXISTS `undo_log`;CREATETABLE `undo_log`
(
`id` bigint(20)NOTNULLAUTO_INCREMENT,
`branch_id` bigint(20)NOTNULL,
`xid` varchar(100)NOTNULL,
`context` varchar(128)NOTNULL,
`rollback_info` longblob NOTNULL,
`log_status` int(11)NOTNULL,
`log_created` datetime NOTNULL,
`log_modified` datetime NOTNULL,
`ext` varchar(100)DEFAULTNULL,PRIMARYKEY(`id`),UNIQUEKEY `ux_undo_log` (`xid`,`branch_id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8;
微服务整合
2.1、父工程项目创建
引入依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.my.springcloud</groupId><artifactId>cloud-seata</artifactId><version>1.0-SNAPSHOT</version><packaging>pom</packaging><modules><module>seata-account-service</module><module>seata-order-service</module><module>seata-storage-service</module><module>cloud-eureka-server7001</module><module>cloud-eureka-server7002</module><module>cloud-gateway-gateway</module></modules><!-- 统一管理jar包版本 --><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><junit.version>4.12</junit.version><log4j.version>1.2.17</log4j.version><lombok.version>1.16.18</lombok.version><mysql.version>8.0.28</mysql.version><druid.version>1.2.4</druid.version><mybatis.spring.boot.version>2.0.0</mybatis.spring.boot.version></properties><!-- 子模块继承之后,提供作用:锁定版本+子modlue不用写groupId和version --><dependencyManagement><dependencies><!--spring boot 2.3.2--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.2.RELEASE</version><type>pom</type><scope>import</scope></dependency><!--spring cloud Hoxton.SR9--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR9</version><type>pom</type><scope>import</scope></dependency><!--spring cloud alibaba 2.2.5.RELEASE--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.2.5.RELEASE</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>${mybatis.spring.boot.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><optional>true</optional></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version></dependency><!--seata依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2021.1</version><exclusions><!--版本较低,1.3.0,因此排除--><exclusion><artifactId>seata-spring-boot-starter</artifactId><groupId>io.seata</groupId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.4.1</version></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><addResources>true</addResources></configuration></plugin></plugins></build></project>
2.2、Eureka集群搭建
eureka搭建 添加链接描述
2.3、搭建账户微服务
2.3.1 新建seata-account-service微服务
2.3.2引入依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--seata依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><artifactId>seata-all</artifactId><groupId>io.seata</groupId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.4.1</version></dependency><!--feign--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2.3.3seata事务用到的表
DROPTABLEIFEXISTS `undo_log`;CREATETABLE `undo_log`
(
`id` bigint(20)NOTNULLAUTO_INCREMENT,
`branch_id` bigint(20)NOTNULL,
`xid` varchar(100)NOTNULL,
`context` varchar(128)NOTNULL,
`rollback_info` longblob NOTNULL,
`log_status` int(11)NOTNULL,
`log_created` datetime NOTNULL,
`log_modified` datetime NOTNULL,
`ext` varchar(100)DEFAULTNULL,PRIMARYKEY(`id`),UNIQUEKEY `ux_undo_log` (`xid`,`branch_id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8;
2.3.4seata微服务配置
在resources 下 新增2个配置文件 , file.conf 和 registry.conf
file.conf
transport {
# tcp udt unix-domain-socket
type ="TCP"
#NIONATIVE
server ="NIO"
#enable heartbeat
heartbeat =true
# the client batch send request enable
enableClientBatchSendRequest =true
#thread factory for netty
threadFactory {
bossThreadPrefix ="NettyBoss"
workerThreadPrefix ="NettyServerNIOWorker"
serverExecutorThread-prefix ="NettyServerBizHandler"
shareBossWorker =false
clientSelectorThreadPrefix ="NettyClientSelector"
clientSelectorThreadSize =1
clientWorkerThreadPrefix ="NettyClientWorkerThread"
# netty boss thread size,will not be used forUDT
bossThreadSize =1
#auto default pin or 8
workerThreadSize ="default"}
shutdown {
# when destroy server, wait seconds
wait =3}
serialization ="seata"
compressor ="none"}
service {
#这里注意,等号前后都是配置,前面是yml里配置的事务组,后面是register.conf里定义的seata-server
vgroupMapping.my_test_tx_group ="seata-server"
#only support when registry.type=file, please don"t set multiple addresses
seata_tc_server.grouplist ="127.0.0.1:8091"
#degrade, current not support
enableDegrade =false
#disable seata
disableGlobalTransaction =false}
client {
rm {
asyncCommitBufferLimit =10000
lock {
retryInterval =10
retryTimes =30
retryPolicyBranchRollbackOnConflict =true}
reportRetryCount =5
tableMetaCheckEnable =false
reportSuccessEnable =false}
tm {
commitRetryCount =5
rollbackRetryCount =5}
undo {
dataValidation =true
logSerialization ="jackson"
logTable ="undo_log"}
log {
exceptionRate =100}}
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
## 注册中心为eureka
type ="eureka"
loadBalance ="RandomLoadBalance"
loadBalanceVirtualNodes =10
nacos {
application ="seata-server"
serverAddr ="127.0.0.1:8848"
group ="SEATA_GROUP"
namespace =""
cluster ="default"
username =""
password =""}
eureka {
## eureka地址
serviceUrl ="http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/"
application ="seata-server"
weight ="1"}
redis {
serverAddr ="localhost:6379"
db =0
password ="123456"
cluster ="default"
timeout =0}
zk {
cluster ="default"
serverAddr ="127.0.0.1:2181"
sessionTimeout =6000
connectTimeout =2000
username =""
password =""}
consul {
cluster ="default"
serverAddr ="127.0.0.1:8500"}
etcd3 {
cluster ="default"
serverAddr ="http://localhost:2379"}
sofa {
serverAddr ="127.0.0.1:9603"
application ="default"
region ="DEFAULT_ZONE"
datacenter ="DefaultDataCenter"
cluster ="default"
group ="SEATA_GROUP"
addressWaitTime ="3000"}
file {
name ="file.conf"}}
config {
# file、nacos 、apollo、zk、consul、etcd3
type ="file"
nacos {
serverAddr ="127.0.0.1:8848"
namespace =""
group ="SEATA_GROUP"
username =""
password =""}
consul {
serverAddr ="127.0.0.1:8500"}
apollo {
appId ="seata-server"
apolloMeta ="http://192.168.1.204:8801"
namespace ="application"
apolloAccesskeySecret =""}
zk {
serverAddr ="127.0.0.1:2181"
sessionTimeout =6000
connectTimeout =2000
username =""
password =""}
etcd3 {
serverAddr ="http://localhost:2379"}
file {
name ="file.conf"}}
2.3.5yml配置seata事务
需要在yml配置文件加上配置项,指明当前服务使用了 seata分布式事务组件,且需要加入的分布式事务组是哪个:
server:
# 服务器的HTTP端口,默认为80
port:2000
# Spring配置
spring:
application:
#微服务名称
name: seata-account-service
datasource:
type:com.alibaba.druid.pool.DruidDataSource # 当前数据源操作类型
driverClassName:com.mysql.cj.jdbc.Driver # mysql驱动包
url: jdbc:mysql://localhost:3306/seata_account?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 # 数据库名称
username: root
password:123456
seata:
enabled:true
application-id: ${spring.application.name}
tx-service-group: seata-server
enable-auto-data-source-proxy:true
service:default:127.0.0.1:8091
vgroup-mapping:
seata-server:default
config:
type: file
file:
name: file.conf
registry:
type: file
file:
name: file
# MyBatis
mybatis:
# 所有Entity别名类所在包
typeAliasesPackage: com.my.springcloud.domain
# 配置mapper的扫描,找到所有的mapper.xml映射文件
mapperLocations: classpath*:mapper/*Mapper.xml
# 加载全局的配置文件
# configLocation: classpath:mybatis/mybatis-config.xml
# eureka客户端配置
eureka:
client:
#表示是否向Eureka注册中心注册自己
register-with-eureka: true
fetch-registry: true # false表示自己就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
service-url:
#defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
instance:
#向注册中心注册服务ID
instance-id: ${spring.cloud.client.ip-address}:${server.port}
prefer-ip-address: true #显示IP地址
# Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds: 30
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds: 90
logging:
level:
io:
seata: info
2.3.6数据源交给seata去代理
importcom.alibaba.druid.pool.DruidDataSource;importio.seata.rm.datasource.DataSourceProxy;importorg.apache.ibatis.session.SqlSessionFactory;importorg.mybatis.spring.SqlSessionFactoryBean;importorg.mybatis.spring.transaction.SpringManagedTransactionFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.core.io.support.PathMatchingResourcePatternResolver;importjavax.sql.DataSource;/**
* 使用Seata对数据源进行代理
*/@ConfigurationpublicclassDataSourceProxyConfig{//加载数据源@Bean@ConfigurationProperties(prefix ="spring.datasource")publicDataSourcedruidDataSource(){returnnewDruidDataSource();}@Value("${mybatis.mapperLocations}")privateString mapperLocations;@BeanpublicDataSourcedataSourceProxy(DataSource dataSource){returnnewDataSourceProxy(dataSource);}@BeanpublicSqlSessionFactorysqlSessionFactoryBean(DataSource dataSourceProxy)throwsException{SqlSessionFactoryBean sqlSessionFactoryBean =newSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(newPathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(newSpringManagedTransactionFactory());return sqlSessionFactoryBean.getObject();}}
2.3.7主启动类去掉默认自动加载数据源
importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;importorg.springframework.cloud.netflix.eureka.EnableEurekaClient;importorg.springframework.cloud.openfeign.EnableFeignClients;@EnableEurekaClient//开启Eureka的客户端服务,将会自动注册到注册中心@SpringBootApplication(exclude =DataSourceAutoConfiguration.class)publicclassSeataAccountApp{publicstaticvoidmain(String[] args){SpringApplication.run(SeataAccountApp.class, args);}}
2.3.8业务数据
db
(1)创建account数据库
(2)在数据库下创建seata_account表
DROPTABLEIFEXISTS `t_account`;CREATETABLE `t_account`
(
`id` BIGINT(20)NOTNULLAUTO_INCREMENT comment '主键',
`user_id` BIGINT(20) comment '用户id',
`total` NUMERIC(20) comment '总额度',
`used` NUMERIC(20) comment '已用额度',
`residue` NUMERIC(20) comment '剩余额度',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8 comment '账户表';
DO
@Data@AllArgsConstructor@NoArgsConstructorpublicclassAccount{privateLong id;/**
* 用户id
*/privateLong userId;/**
* 总额度
*/privateBigDecimal total;/**
* 已用额度
*/privateBigDecimal used;/**
* 剩余额度
*/privateBigDecimal residue;}
dao
@MapperpublicinterfaceAccountDao{/**
* 扣减账户余额
*/voiddecrease(@Param("userId")Long userId,@Param("money")BigDecimal money);}
service
publicinterfaceAccountService{/**
* 扣减账户余额
* @param userId 用户id
* @param money 金额
*/voiddecrease(@RequestParam("userId")Long userId,@RequestParam("money")BigDecimal money);}
mapper.xml
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC"-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.my.springcloud.dao.AccountDao"><resultMap id="BaseResultMap" type="com.my.springcloud.domain.Account"><id column="id" property="id" jdbcType="BIGINT"/><result column="user_id" property="userId" jdbcType="BIGINT"/><result column="total" property="total" jdbcType="DECIMAL"/><result column="used" property="used" jdbcType="DECIMAL"/><result column="residue" property="residue" jdbcType="DECIMAL"/></resultMap><update id="decrease">UPDATE t_account
SET
residue = residue - #{money},used = used + #{money}WHERE
user_id = #{userId};</update></mapper>
controller
public class AccountController {
@Resource
AccountService accountService;
/**
* 扣减账户余额
*/
@RequestMapping("/account/decrease")
public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) {
accountService.decrease(userId, money);
return new CommonResult(200, "扣减账户余额成功!");
}
}
2.4、搭建库存微服务
2.4.1 新建seata-storage-service微服务
2.4.2引入依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--seata依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><artifactId>seata-all</artifactId><groupId>io.seata</groupId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.4.1</version></dependency><!--feign--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2.4.3seata事务用到的表
DROPTABLEIFEXISTS `undo_log`;CREATETABLE `undo_log`
(
`id` bigint(20)NOTNULLAUTO_INCREMENT,
`branch_id` bigint(20)NOTNULL,
`xid` varchar(100)NOTNULL,
`context` varchar(128)NOTNULL,
`rollback_info` longblob NOTNULL,
`log_status` int(11)NOTNULL,
`log_created` datetime NOTNULL,
`log_modified` datetime NOTNULL,
`ext` varchar(100)DEFAULTNULL,PRIMARYKEY(`id`),UNIQUEKEY `ux_undo_log` (`xid`,`branch_id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8;
2.4.4seata微服务配置
在resources 下 新增2个配置文件 , file.conf 和 registry.conf
file.conf
transport {
# tcp udt unix-domain-socket
type ="TCP"
#NIONATIVE
server ="NIO"
#enable heartbeat
heartbeat =true
# the client batch send request enable
enableClientBatchSendRequest =true
#thread factory for netty
threadFactory {
bossThreadPrefix ="NettyBoss"
workerThreadPrefix ="NettyServerNIOWorker"
serverExecutorThread-prefix ="NettyServerBizHandler"
shareBossWorker =false
clientSelectorThreadPrefix ="NettyClientSelector"
clientSelectorThreadSize =1
clientWorkerThreadPrefix ="NettyClientWorkerThread"
# netty boss thread size,will not be used forUDT
bossThreadSize =1
#auto default pin or 8
workerThreadSize ="default"}
shutdown {
# when destroy server, wait seconds
wait =3}
serialization ="seata"
compressor ="none"}
service {
#这里注意,等号前后都是配置,前面是yml里配置的事务组,后面是register.conf里定义的seata-server
vgroupMapping.my_test_tx_group ="seata-server"
#only support when registry.type=file, please don"t set multiple addresses
seata_tc_server.grouplist ="127.0.0.1:8091"
#degrade, current not support
enableDegrade =false
#disable seata
disableGlobalTransaction =false}
client {
rm {
asyncCommitBufferLimit =10000
lock {
retryInterval =10
retryTimes =30
retryPolicyBranchRollbackOnConflict =true}
reportRetryCount =5
tableMetaCheckEnable =false
reportSuccessEnable =false}
tm {
commitRetryCount =5
rollbackRetryCount =5}
undo {
dataValidation =true
logSerialization ="jackson"
logTable ="undo_log"}
log {
exceptionRate =100}}
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
## 注册中心为eureka
type ="eureka"
loadBalance ="RandomLoadBalance"
loadBalanceVirtualNodes =10
nacos {
application ="seata-server"
serverAddr ="127.0.0.1:8848"
group ="SEATA_GROUP"
namespace =""
cluster ="default"
username =""
password =""}
eureka {
## eureka地址
serviceUrl ="http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/"
application ="seata-server"
weight ="1"}
redis {
serverAddr ="localhost:6379"
db =0
password ="123456"
cluster ="default"
timeout =0}
zk {
cluster ="default"
serverAddr ="127.0.0.1:2181"
sessionTimeout =6000
connectTimeout =2000
username =""
password =""}
consul {
cluster ="default"
serverAddr ="127.0.0.1:8500"}
etcd3 {
cluster ="default"
serverAddr ="http://localhost:2379"}
sofa {
serverAddr ="127.0.0.1:9603"
application ="default"
region ="DEFAULT_ZONE"
datacenter ="DefaultDataCenter"
cluster ="default"
group ="SEATA_GROUP"
addressWaitTime ="3000"}
file {
name ="file.conf"}}
config {
# file、nacos 、apollo、zk、consul、etcd3
type ="file"
nacos {
serverAddr ="127.0.0.1:8848"
namespace =""
group ="SEATA_GROUP"
username =""
password =""}
consul {
serverAddr ="127.0.0.1:8500"}
apollo {
appId ="seata-server"
apolloMeta ="http://192.168.1.204:8801"
namespace ="application"
apolloAccesskeySecret =""}
zk {
serverAddr ="127.0.0.1:2181"
sessionTimeout =6000
connectTimeout =2000
username =""
password =""}
etcd3 {
serverAddr ="http://localhost:2379"}
file {
name ="file.conf"}}
2.4.5yml配置seata事务
需要在yml配置文件加上配置项,指明当前服务使用了 seata分布式事务组件,且需要加入的分布式事务组是哪个:
server:
# 服务器的HTTP端口,默认为80
port:2002
#Spring配置
spring:
application:
name: seata-storage-service
#----------------------------数据源----------------------------
datasource:
type:com.alibaba.druid.pool.DruidDataSource
driverClassName:com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_storage?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password:123456
druid:
initial-size:5
min-idle:5
max-active:20
test-while-idle:true
test-on-borrow:false
test-on-return:false
pool-prepared-statements:true
max-pool-prepared-statement-per-connection-size:20
max-wait:60000
time-between-eviction-runs-millis:60000
min-evictable-idle-time-millis:30000
filters: stat
async-init:true
#----------------------------seata----------------------------
seata:
enabled:true
application-id: ${spring.application.name}
tx-service-group: seata-server
enable-auto-data-source-proxy:true
service:default:127.0.0.1:8091
vgroup-mapping:
seata-server:default
config:
type: file
file:
name: file.conf
registry:
type: file
file:
name: file
# eureka客户端配置
eureka:
client:
#表示是否向Eureka注册中心注册自己
register-with-eureka:true
fetch-registry:true # false表示自己就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
service-url:
#defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
instance:
#向注册中心注册服务ID
instance-id: ${spring.cloud.client.ip-address}:${server.port}
prefer-ip-address:true #显示IP地址
# Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds:30
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds:90
#MyBatis
mybatis:
#所有Entity别名类所在包,多个路径用逗号分割
typeAliasesPackage: com.my.springcloud.domain
# 配置mapper的扫描,找到所有的mapper,多个目录用逗号或者分号分隔
mapperLocations: classpath*:mapper/*Mapper.xml
# 加载全局的配置文件
# configLocation: classpath:mybatis/mybatis-config.xml
2.4.6数据源交给seata去代理
importcom.alibaba.druid.pool.DruidDataSource;importio.seata.rm.datasource.DataSourceProxy;importorg.apache.ibatis.session.SqlSessionFactory;importorg.mybatis.spring.SqlSessionFactoryBean;importorg.mybatis.spring.transaction.SpringManagedTransactionFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.core.io.support.PathMatchingResourcePatternResolver;importjavax.sql.DataSource;/**
* 使用Seata对数据源进行代理
*/@ConfigurationpublicclassDataSourceProxyConfig{//加载数据源@Bean@ConfigurationProperties(prefix ="spring.datasource")publicDataSourcedruidDataSource(){returnnewDruidDataSource();}@Value("${mybatis.mapperLocations}")privateString mapperLocations;@BeanpublicDataSourcedataSourceProxy(DataSource dataSource){returnnewDataSourceProxy(dataSource);}@BeanpublicSqlSessionFactorysqlSessionFactoryBean(DataSource dataSourceProxy)throwsException{SqlSessionFactoryBean sqlSessionFactoryBean =newSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(newPathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(newSpringManagedTransactionFactory());return sqlSessionFactoryBean.getObject();}}
2.4.7主启动类去掉默认自动加载数据源
@EnableEurekaClient//开启Eureka的客户端服务,将会自动注册到注册中心@SpringBootApplication(exclude =DataSourceAutoConfiguration.class)publicclassSeataStorageApp{publicstaticvoidmain(String[] args){SpringApplication.run(SeataStorageApp.class, args);}}
2.4.8业务数据
db
(1)创建storage数据库
(2)在数据库下创建seata_storage表
DROPTABLEIFEXISTS `t_storage`;CREATETABLE `t_storage`
(
`id` bigint(20)NOTNULLAUTO_INCREMENT comment '主键',
`product_id` bigint(30) comment '产品id',
`total` INTEGER(20) comment '总库存',
`used` INTEGER(20) comment '已用库存',
`residue` INTEGER(30) comment '剩余库存',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8 comment '库存表';
DO
@DatapublicclassStorage{privateLong id;/**
* 产品id
*/privateLong productId;/**
* 总库存
*/privateInteger total;/**
* 已用库存
*/privateInteger used;/**
* 剩余库存
*/privateInteger residue;}
dao
@MapperpublicinterfaceStorageDao{//扣减库存voiddecrease(@Param("productId")Long productId,@Param("count")Integer count);}
service
publicinterfaceStorageService{/**
* 扣减库存
*/voiddecrease(Long productId,Integer count);}
mapper.xml
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC"-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.my.springcloud.dao.StorageDao"><resultMap id="BaseResultMap" type="com.my.springcloud.domain.Storage"><id column="id" property="id" jdbcType="BIGINT"/><result column="product_id" property="productId" jdbcType="BIGINT"/><result column="total" property="total" jdbcType="INTEGER"/><result column="used" property="used" jdbcType="INTEGER"/><result column="residue" property="residue" jdbcType="INTEGER"/></resultMap><update id="decrease">UPDATE
t_storage
SET used = used + #{count},
residue = residue - #{count}WHERE product_id = #{productId}</update></mapper>
controller
@RestControllerpublicclassStorageController{@AutowiredprivateStorageService storageService;/**
* 扣减库存
*/@RequestMapping("/storage/decrease")publicCommonResultdecrease(Long productId,Integer count){
storageService.decrease(productId, count);returnnewCommonResult(200,"扣减库存成功!");}}
2.5、搭建订单微服务
2.5.1 新建seata-order-service微服务
2.5.2引入依赖
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--seata依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><artifactId>seata-all</artifactId><groupId>io.seata</groupId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.4.1</version></dependency><!--feign--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- 添加 httpclient 框架依赖 --><dependency><groupId>io.github.openfeign</groupId><artifactId>feign-httpclient</artifactId></dependency><dependency><groupId>io.github.openfeign</groupId><artifactId>feign-okhttp</artifactId></dependency></dependencies>
2.5.3seata事务用到的表
DROPTABLEIFEXISTS `undo_log`;CREATETABLE `undo_log`
(
`id` bigint(20)NOTNULLAUTO_INCREMENT,
`branch_id` bigint(20)NOTNULL,
`xid` varchar(100)NOTNULL,
`context` varchar(128)NOTNULL,
`rollback_info` longblob NOTNULL,
`log_status` int(11)NOTNULL,
`log_created` datetime NOTNULL,
`log_modified` datetime NOTNULL,
`ext` varchar(100)DEFAULTNULL,PRIMARYKEY(`id`),UNIQUEKEY `ux_undo_log` (`xid`,`branch_id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8;
2.5.4seata微服务配置
在resources 下 新增2个配置文件 , file.conf 和 registry.conf
file.conf
transport {
# tcp udt unix-domain-socket
type ="TCP"
#NIONATIVE
server ="NIO"
#enable heartbeat
heartbeat =true
# the client batch send request enable
enableClientBatchSendRequest =true
#thread factory for netty
threadFactory {
bossThreadPrefix ="NettyBoss"
workerThreadPrefix ="NettyServerNIOWorker"
serverExecutorThread-prefix ="NettyServerBizHandler"
shareBossWorker =false
clientSelectorThreadPrefix ="NettyClientSelector"
clientSelectorThreadSize =1
clientWorkerThreadPrefix ="NettyClientWorkerThread"
# netty boss thread size,will not be used forUDT
bossThreadSize =1
#auto default pin or 8
workerThreadSize ="default"}
shutdown {
# when destroy server, wait seconds
wait =3}
serialization ="seata"
compressor ="none"}
service {
#这里注意,等号前后都是配置,前面是yml里配置的事务组,后面是register.conf里定义的seata-server
vgroupMapping.my_test_tx_group ="seata-server"
#only support when registry.type=file, please don"t set multiple addresses
seata_tc_server.grouplist ="127.0.0.1:8091"
#degrade, current not support
enableDegrade =false
#disable seata
disableGlobalTransaction =false}
client {
rm {
asyncCommitBufferLimit =10000
lock {
retryInterval =10
retryTimes =30
retryPolicyBranchRollbackOnConflict =true}
reportRetryCount =5
tableMetaCheckEnable =false
reportSuccessEnable =false}
tm {
commitRetryCount =5
rollbackRetryCount =5}
undo {
dataValidation =true
logSerialization ="jackson"
logTable ="undo_log"}
log {
exceptionRate =100}}
registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
## 注册中心为eureka
type ="eureka"
loadBalance ="RandomLoadBalance"
loadBalanceVirtualNodes =10
nacos {
application ="seata-server"
serverAddr ="127.0.0.1:8848"
group ="SEATA_GROUP"
namespace =""
cluster ="default"
username =""
password =""}
eureka {
## eureka地址
serviceUrl ="http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/"
application ="seata-server"
weight ="1"}
redis {
serverAddr ="localhost:6379"
db =0
password ="123456"
cluster ="default"
timeout =0}
zk {
cluster ="default"
serverAddr ="127.0.0.1:2181"
sessionTimeout =6000
connectTimeout =2000
username =""
password =""}
consul {
cluster ="default"
serverAddr ="127.0.0.1:8500"}
etcd3 {
cluster ="default"
serverAddr ="http://localhost:2379"}
sofa {
serverAddr ="127.0.0.1:9603"
application ="default"
region ="DEFAULT_ZONE"
datacenter ="DefaultDataCenter"
cluster ="default"
group ="SEATA_GROUP"
addressWaitTime ="3000"}
file {
name ="file.conf"}}
config {
# file、nacos 、apollo、zk、consul、etcd3
type ="file"
nacos {
serverAddr ="127.0.0.1:8848"
namespace =""
group ="SEATA_GROUP"
username =""
password =""}
consul {
serverAddr ="127.0.0.1:8500"}
apollo {
appId ="seata-server"
apolloMeta ="http://192.168.1.204:8801"
namespace ="application"
apolloAccesskeySecret =""}
zk {
serverAddr ="127.0.0.1:2181"
sessionTimeout =6000
connectTimeout =2000
username =""
password =""}
etcd3 {
serverAddr ="http://localhost:2379"}
file {
name ="file.conf"}}
2.5.5yml配置seata事务
需要在yml配置文件加上配置项,指明当前服务使用了 seata分布式事务组件,且需要加入的分布式事务组是哪个:
server:
# 服务器的HTTP端口,默认为80
port:2001
#Spring配置
spring:
application:
name: seata-order-service
#----------------------------数据源----------------------------
datasource:
type:com.alibaba.druid.pool.DruidDataSource
driverClassName:com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password:123456
druid:
initial-size:5
min-idle:5
max-active:20
test-while-idle:true
test-on-borrow:false
test-on-return:false
pool-prepared-statements:true
max-pool-prepared-statement-per-connection-size:20
max-wait:60000
time-between-eviction-runs-millis:60000
min-evictable-idle-time-millis:30000
filters: stat
async-init:true
#----------------------------seata----------------------------
seata:
enabled:true
application-id: ${spring.application.name}
tx-service-group: seata-server
enable-auto-data-source-proxy:true
service:default:127.0.0.1:8091
vgroup-mapping:
seata-server:default
config:
type: file
file:
name: file.conf
registry:
type: file
file:
name: file
# eureka客户端配置
eureka:
client:
#表示是否向Eureka注册中心注册自己
register-with-eureka:true
fetch-registry:true # false表示自己就是注册中心,我的职责就是维护服务实例,并不需要去检索服务
service-url:
#defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/
instance:
#向注册中心注册服务ID
instance-id: ${spring.cloud.client.ip-address}:${server.port}
prefer-ip-address:true #显示IP地址
# Eureka客户端向服务端发送心跳的时间间隔,单位为秒(默认是30秒)
lease-renewal-interval-in-seconds:30
#Eureka服务端在收到最后一次心跳后等待时间上限,单位为秒(默认是90秒),超时将剔除服务
lease-expiration-duration-in-seconds:90
#MyBatis
mybatis:
#所有Entity别名类所在包,多个路径用逗号分割
typeAliasesPackage: com.my.springcloud.domain
# 配置mapper的扫描,找到所有的mapper,多个目录用逗号或者分号分隔
mapperLocations: classpath*:mapper/*Mapper.xml
# 加载全局的配置文件
# configLocation: classpath:mybatis/mybatis-config.xml
# feign超时设置
feign:
client:
config:
default: # default全局的配置
loggerLevel: BASIC # 日志级别,BASIC就是基本的请求和响应信息
httpclient:
enabled: true # 开启feign对HttpClient的支持
max-connections: 200 # 最大的连接数
max-connections-per-route: 50 # 每个路径的最大连接数
logging:
level:
io:
seata: info
2.5.6数据源交给seata去代理
importcom.alibaba.druid.pool.DruidDataSource;importio.seata.rm.datasource.DataSourceProxy;importorg.apache.ibatis.session.SqlSessionFactory;importorg.mybatis.spring.SqlSessionFactoryBean;importorg.mybatis.spring.transaction.SpringManagedTransactionFactory;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.core.io.support.PathMatchingResourcePatternResolver;importjavax.sql.DataSource;/**
* 使用Seata对数据源进行代理
*/@ConfigurationpublicclassDataSourceProxyConfig{//加载数据源@Bean@ConfigurationProperties(prefix ="spring.datasource")publicDataSourcedruidDataSource(){returnnewDruidDataSource();}@Value("${mybatis.mapperLocations}")privateString mapperLocations;@BeanpublicDataSourcedataSourceProxy(DataSource dataSource){returnnewDataSourceProxy(dataSource);}@BeanpublicSqlSessionFactorysqlSessionFactoryBean(DataSource dataSourceProxy)throwsException{SqlSessionFactoryBean sqlSessionFactoryBean =newSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(newPathMatchingResourcePatternResolver().getResources(mapperLocations));
sqlSessionFactoryBean.setTransactionFactory(newSpringManagedTransactionFactory());return sqlSessionFactoryBean.getObject();}}
2.5.7主启动类去掉默认自动加载数据源
@EnableFeignClients@EnableEurekaClient//开启Eureka的客户端服务,将会自动注册到注册中心@SpringBootApplication(exclude =DataSourceAutoConfiguration.class)//取消数据源的自动创建publicclassSeataOrderApp{publicstaticvoidmain(String[] args){SpringApplication.run(SeataOrderApp.class, args);}}
2.5.8业务数据
db
(1)创建storage数据库
(2)在数据库下创建seata_storage表
DROPTABLEIFEXISTS `t_storage`;CREATETABLE `t_storage`
(
`id` bigint(20)NOTNULLAUTO_INCREMENT comment '主键',
`product_id` bigint(30) comment '产品id',
`total` INTEGER(20) comment '总库存',
`used` INTEGER(20) comment '已用库存',
`residue` INTEGER(30) comment '剩余库存',PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=1DEFAULTCHARSET=utf8 comment '库存表';
DO
@Data@Data@AllArgsConstructor@NoArgsConstructorpublicclassOrder{privateLong id;privateLong userId;privateLong productId;privateInteger count;privateBigDecimal money;privateInteger status;//订单状态:0:创建中;1:已完结}
dao
@MapperpublicinterfaceOrderDao{//1 新建订单voidcreate(Order order);//2 修改订单状态,从零改为1voidupdate(@Param("userId")Long userId,@Param("status")Integer status);}
service
publicinterfaceOrderService{voidcreate(Order order);}
service实现类添加@GlobalTransactional
@Service@Slf4jpublicclassOrderServiceImplimplementsOrderService{@ResourceprivateOrderDao orderDao;@ResourceprivateStorageService storageService;@ResourceprivateAccountService accountService;/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
* 简单说:下订单->扣库存->减余额->改状态
*/@Override@GlobalTransactional( rollbackFor =Exception.class)publicvoidcreate(Order order){
log.info("----->开始新建订单");//1 新建订单
orderDao.create(order);//2 扣减库存
log.info("----->订单微服务开始调用库存,做扣减Count");
storageService.decrease(order.getProductId(), order.getCount());
log.info("----->订单微服务开始调用库存,做扣减end");//3 扣减账户
log.info("----->订单微服务开始调用账户,做扣减Money");
accountService.decrease(order.getUserId(), order.getMoney());
log.info("----->订单微服务开始调用账户,做扣减end");//4 修改订单状态,从零到1,1代表已经完成
log.info("----->修改订单状态开始");
orderDao.update(order.getUserId(),0);
log.info("----->修改订单状态结束");
log.info("----->下订单结束了,O(∩_∩)O哈哈~");}}
mapper.xml
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC"-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.my.springcloud.dao.OrderDao"><resultMap id="BaseResultMap" type="com.my.springcloud.domain.Order"><id column="id" property="id" jdbcType="BIGINT"/><result column="user_id" property="userId" jdbcType="BIGINT"/><result column="product_id" property="productId" jdbcType="BIGINT"/><result column="count" property="count" jdbcType="INTEGER"/><result column="money" property="money" jdbcType="DECIMAL"/><result column="status" property="status" jdbcType="INTEGER"/></resultMap><insert id="create">
insert into t_order (id, user_id, product_id, count, money, status)
values (null, #{userId}, #{productId}, #{count}, #{money},0);</insert><update id="update">
update t_order
set status =1
where user_id = #{userId}
and status = #{status};</update></mapper>
fegin
@FeignClient(value ="seata-account-service")publicinterfaceAccountService{@PostMapping(value ="/account/decrease")CommonResultdecrease(@RequestParam("userId")Long userId,@RequestParam("money")BigDecimal money);}
@FeignClient(value ="seata-storage-service")publicinterfaceStorageService{@PostMapping(value ="/storage/decrease")CommonResultdecrease(@RequestParam("productId")Long productId,@RequestParam("count")Integer count);}
controller
@RestControllerpublicclassOrderController{@ResourceprivateOrderService orderService;@GetMapping("/order/create")publicCommonResultcreate(Order order){
orderService.create(order);returnnewCommonResult(200,"订单创建成功");}}
2.6、测试
启动eureka7001和7002
启动seata service
启动库存微服务,账户微服务,订单微服务
访问 http://localhost:2001/order/create
业务流程
1.创建订单
2 扣减库存
3 扣减账户
4 修改订单状态,从零到1,1代表已经完成
添加@GlobalTransactional注解开启seata事务
扣减账户的时候模拟异常
获得全局事务状态
// 开启全局事务地方获取全局事务xidString xid =RootContext.getXID();// 通过全局事务xid获得GlobalStatus枚举类GlobalTransaction globalTransaction =GlobalTransactionContext.reload(xid);GlobalStatus globalStatus = globalTransaction.getStatus();// 通过GlobalStatus枚举类获取全局事务状态值int code = globalStatus.getCode();
使用idea控制台获取全局事务id
查看数据库
查看数据库表数据已经回滚
版权归原作者 人生就像一场戏! 所有, 如有侵权,请联系我们删除。