seata源码解析:事务状态及全局锁的存储
seata源码解析:事务状态及全局锁的存储
事务状态的存储
在seata中,无论你使用哪种事务模式,都会将全局事务状态和分支事务状态存储下来。有三种存储模式可供你选择,db,redis,file。
本文只会详细分析db这一种存储模式,其他的类似。
当使用db这一种模式时,seata server都会将全局事务的状态存在global_table表中,将分支事务的状态存在branch_table表中,如下所示
-- the table to store GlobalSession dataCREATE TABLE IF NOT EXISTS `global_table`( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;-- the table to store BranchSession dataCREATE TABLE IF NOT EXISTS `branch_table`( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;
global_table表中status表示的含义为全局事务的状态,对应的枚举类为GlobalStatus branch_table表中status表示的含义为分支事务的状态,对应的枚举类为BranchStatus
当事务发生问题时,我们就可以通过global_table和branch_table的status字段判断事务是哪一步执行失败了。
全局事务的状态表
分支事务的状态表
状态解释来自官网:-p 8091
public class Server { public static void main(String[] args) throws IOException { // 省略部分代码... // SessionHolder负责事务日志的持久化存储 // 设置存储模式,有三种可选类型,file,db,redis SessionHolder.init(parameterParser.getStoreMode()); // 创建事务协调器 DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer); // 初始化5个定时任务 coordinator.init(); // 省略部分代码... System.exit(0); }}
SessionHolder#init根据传入的存储模式初始化4个不同的SessionManager,SessionManager是用来存储全局事务和分支事务状态的。其中全局事务用GlobalSession来表示,分支事务用BranchSession来表示,一个GlobalSession包含多个BranchSession
// 保存了所有的GlobalSessionprivate static SessionManager ROOT_SESSION_MANAGER;// 需要异步commit的GlobalSessionprivate static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;// 需要重试commit的GlobalSessionprivate static SessionManager RETRY_COMMITTING_SESSION_MANAGER;// 需要重试roollback的GlobalSessionprivate static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
在init方法中通过SPI的方式实例化对应的类
// SessionHolderpublic static void init(String mode) { if (StringUtils.isBlank(mode)) { mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE); } StoreMode storeMode = StoreMode.get(mode); if (StoreMode.DB.equals(storeMode)) { // 通过spi加载SessionManager ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName()); ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME}); RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME}); RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); } else if (StoreMode.FILE.equals(storeMode)) { // 省略其他存储方式的加载逻辑 } // 删除已经完成的GlobalSession reload(storeMode);}
4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DataBaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性
为什么要搞4个SessionManager?
其实就是用不同的SessionManager管理不同状态的任务,这样逻辑比较清晰。 当不同的SessionManager调用allSessions方法时,返回的就是对应状态的GlobalSession,逻辑比较清晰
DataBaseSessionManager#allSessions
我们来看一下SessionManager的继承关系
SessionLifecycleListener看接口名字就是基于观察者模式设计的,当GlobalSession状态发生改变的时候,会发布通知给监听者,然后监听者做相应动作。目前SessionLifecycleListener接口的实现类只有各种SessionManager,当收到状态改变的通知时,将其状态存储下来
AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法
public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener { protected TransactionStoreManager transactionStoreManager; // 省略部分代码 // 重写了SessionManager接口方法 @Override public void addGlobalSession(GlobalSession session) throws TransactionException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD); } writeSession(LogOperation.GLOBAL_ADD, session); } // 重写了SessionManager接口方法 @Override public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_UPDATE); } writeSession(LogOperation.GLOBAL_UPDATE, session); } // 重写了SessionLifecycleListener接口方法 @Override public void onBegin(GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); } // 重写了SessionLifecycleListener接口方法 @Override public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException { updateGlobalSessionStatus(globalSession, status); }}
可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了
AbstractSessionManager有3个实现类,说明seata支持3种存储模式。而最终存储的工作是交给TransactionStoreManager来实现的
这些增加事务和事务状态变化的持久化操作非常简单,就是执行插入sql和更新sql
说回我们的启动流程,在DefaultCoordinator#init方法中,初始化5个定时任务
retryRollbacking:分支事务回滚失败时,不断重试retryCommitting:分支事务提交失败时,不断重试asyncCommitting:执行异步commit,用在at模式,因为at模式的commit操作其实就是删除undolog,可以异步执行timeoutCheck:当事务处于开始状态,将状态设置为超时回滚,将其放入重试回滚管理器,让其回滚全局事务undoLogDelete:向rm端发送请求,删除7天(默认)之前的undolog
这些重试操作执行的逻辑和全局事务的提交/回滚逻辑一致,就不介绍了
// DefaultCoordinatorpublic void init() { // 重试rollback定时任务 retryRollbacking.scheduleAtFixedRate(() -> { boolean lock = SessionHolder.retryRollbackingLock(); if (lock) { try { handleRetryRollbacking(); } catch (Exception e) { LOGGER.info("Exception retry rollbacking ... ", e); } finally { SessionHolder.unRetryRollbackingLock(); } } }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 重试commit定时任务 retryCommitting.scheduleAtFixedRate(() -> { boolean lock = SessionHolder.retryCommittingLock(); if (lock) { try { handleRetryCommitting(); } catch (Exception e) { LOGGER.info("Exception retry committing ... ", e); } finally { SessionHolder.unRetryCommittingLock(); } } }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 省略部分代码}
全局锁
在前面的文章中我们说过,在Seata AT模式中,我们用全局锁来避免脏写,同时也可以用全局锁将默认的隔离级别从读未提交升高为读已提交。
全局锁是存在TC端的,所以在TC端要提供相应的接口,来进行加锁,解锁,相应的资源是否被加锁等操作!
当分支事务提交的时候,需要把修改的资源锁定。当全局事务提交后才会把相应的资源解锁。
各种事务的加锁状态会存在lock_table表中
-- the table to store lock dataCREATE TABLE IF NOT EXISTS `lock_table`( `row_key` VARCHAR(128) NOT NULL, -- resource_id+table_name+pk的组合 `xid` VARCHAR(128), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;
用个例子演示一下全局锁的工作流程,
假如account_info表中有如下2个账户数据,其中id是主键
id | user_id |
1 | 1001 |
2 | 1002 |
假如说执行如下sql
update account_info set balance = 100
当进行commit的时候,需要加全局锁,此时根据update语句构建出select语句,找出影响的主键,即1和2
通过主键构建出lockKey=account_info:1,2
lockKey的构建规则如下
如果主键是一列(id列),则形式如下
// 表名:主键值1,主键值2account_info:1,2
如果主键是多列(id列+user_id列),则形式如下
// 表名:主键值1_主键值a,主键值1_主键值baccount_info:1_1001,2_1002
当然有可能一个事务中,有可能有多个表,多个表的构建规则如下(中间通过;分隔即可)
// 表名1:主键值1,主键值2;表名2:主键值1,主键值2account_flow:1,2;account_info:1,2
加锁
当加锁是会往lock_table中假如如下2个记录(省略无关的列)
row_key | xid |
jdbc:mysql://myhost:3306/db_account_1^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
jdbc:mysql://myhost:3306/db_account_2^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
row_key的构建规则如下
rowKey = resourceId + "^^^" + tableName + "^^^" +
tcc模式下resourceId为@TwoPhaseBusinessAction注解的name属性 而在at和xa模式中resourceId都为数据库连接url
解锁
根据xid和row_key删除记录
查询是否能加锁
根据row_key从lock_table中查询记录,如果记录中的xid和查询传递过来的xid不一致则加锁失败,如果没有记录或者记录的xid和传递过来的xid一致,则加锁成功
源码分析
TC端定义的锁操作接口
public interface LockManager { // 对分支事务的资源加锁 boolean acquireLock(BranchSession branchSession) throws TransactionException; // 对分支事务的资源解锁 boolean releaseLock(BranchSession branchSession) throws TransactionException; // 对全局事务中的所有分支事务的资源解锁 boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException; // 根据xid resourceId lockKey 查询是否已经加锁 boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException; // 清除所有锁 void cleanAllLocks() throws TransactionException;}
老规矩,我们还是只分析db这种存储模式,最终的加解锁操作会交给Locker
加锁
// AbstractLockManagerpublic boolean acquireLock(BranchSession branchSession) throws TransactionException { if (branchSession == null) { throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); } String lockKey = branchSession.getLockKey(); if (StringUtils.isNullOrEmpty(lockKey)) { // no lock return true; } // get locks of branch // 创建 RowLock 集合,一条加锁记录对应一个RowLock对象 List
把需要加锁的资源转换成RowLock集合
protected List
转换RowLock的过程,基本就是对lockKey的解析过程,
protected List
构造好RowLock集合,接下来就是调用DataBaseLocker执行真正的加锁操作
首先将RowLock转为LockDO
protected List
// LockStoreDataBaseDAOpublic boolean acquireLock(List
解锁
解锁的逻辑和加锁的逻辑差不多,直接分析最终执行的部分了
public boolean unLock(List
参考博客
事务状态逻辑 [1]http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。