seata源码解析:事务状态及全局锁的存储

网友投稿 661 2022-09-02 04:05:01

seata源码解析:事务状态及全局锁的存储

事务状态的存储

在seata中,无论你使用哪种事务模式,都会将全局事务状态和分支事务状态存储下来。有三种存储模式可供你选择,db,redis,file。

本文只会详细分析db这一种存储模式,其他的类似。

当使用db这一种模式时,seata server都会将全局事务的状态存在global_table表中,将分支事务的状态存在branch_table表中,如下所示

-- the table to store GlobalSession dataCREATE TABLE IF NOT EXISTS `global_table`( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;-- the table to store BranchSession dataCREATE TABLE IF NOT EXISTS `branch_table`( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;

global_table表中status表示的含义为全局事务的状态,对应的枚举类为GlobalStatus branch_table表中status表示的含义为分支事务的状态,对应的枚举类为BranchStatus

当事务发生问题时,我们就可以通过global_table和branch_table的status字段判断事务是哪一步执行失败了。

全局事务的状态表

分支事务的状态表

状态解释来自官网:-p 8091

public class Server { public static void main(String[] args) throws IOException { // 省略部分代码... // SessionHolder负责事务日志的持久化存储 // 设置存储模式,有三种可选类型,file,db,redis SessionHolder.init(parameterParser.getStoreMode()); // 创建事务协调器 DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer); // 初始化5个定时任务 coordinator.init(); // 省略部分代码... System.exit(0); }}

SessionHolder#init根据传入的存储模式初始化4个不同的SessionManager,SessionManager是用来存储全局事务和分支事务状态的。其中全局事务用GlobalSession来表示,分支事务用BranchSession来表示,一个GlobalSession包含多个BranchSession

// 保存了所有的GlobalSessionprivate static SessionManager ROOT_SESSION_MANAGER;// 需要异步commit的GlobalSessionprivate static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;// 需要重试commit的GlobalSessionprivate static SessionManager RETRY_COMMITTING_SESSION_MANAGER;// 需要重试roollback的GlobalSessionprivate static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

在init方法中通过SPI的方式实例化对应的类

// SessionHolderpublic static void init(String mode) { if (StringUtils.isBlank(mode)) { mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE); } StoreMode storeMode = StoreMode.get(mode); if (StoreMode.DB.equals(storeMode)) { // 通过spi加载SessionManager ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName()); ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME}); RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME}); RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(), new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME}); } else if (StoreMode.FILE.equals(storeMode)) { // 省略其他存储方式的加载逻辑 } // 删除已经完成的GlobalSession reload(storeMode);}

4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DataBaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性

为什么要搞4个SessionManager?

其实就是用不同的SessionManager管理不同状态的任务,这样逻辑比较清晰。 当不同的SessionManager调用allSessions方法时,返回的就是对应状态的GlobalSession,逻辑比较清晰

DataBaseSessionManager#allSessions

我们来看一下SessionManager的继承关系

SessionLifecycleListener看接口名字就是基于观察者模式设计的,当GlobalSession状态发生改变的时候,会发布通知给监听者,然后监听者做相应动作。目前SessionLifecycleListener接口的实现类只有各种SessionManager,当收到状态改变的通知时,将其状态存储下来

AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法

public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener { protected TransactionStoreManager transactionStoreManager; // 省略部分代码 // 重写了SessionManager接口方法 @Override public void addGlobalSession(GlobalSession session) throws TransactionException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD); } writeSession(LogOperation.GLOBAL_ADD, session); } // 重写了SessionManager接口方法 @Override public void updateGlobalSessionStatus(GlobalSession session, GlobalStatus status) throws TransactionException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_UPDATE); } writeSession(LogOperation.GLOBAL_UPDATE, session); } // 重写了SessionLifecycleListener接口方法 @Override public void onBegin(GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); } // 重写了SessionLifecycleListener接口方法 @Override public void onStatusChange(GlobalSession globalSession, GlobalStatus status) throws TransactionException { updateGlobalSessionStatus(globalSession, status); }}

可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了

AbstractSessionManager有3个实现类,说明seata支持3种存储模式。而最终存储的工作是交给TransactionStoreManager来实现的

这些增加事务和事务状态变化的持久化操作非常简单,就是执行插入sql和更新sql

说回我们的启动流程,在DefaultCoordinator#init方法中,初始化5个定时任务

retryRollbacking:分支事务回滚失败时,不断重试retryCommitting:分支事务提交失败时,不断重试asyncCommitting:执行异步commit,用在at模式,因为at模式的commit操作其实就是删除undolog,可以异步执行timeoutCheck:当事务处于开始状态,将状态设置为超时回滚,将其放入重试回滚管理器,让其回滚全局事务undoLogDelete:向rm端发送请求,删除7天(默认)之前的undolog

这些重试操作执行的逻辑和全局事务的提交/回滚逻辑一致,就不介绍了

// DefaultCoordinatorpublic void init() { // 重试rollback定时任务 retryRollbacking.scheduleAtFixedRate(() -> { boolean lock = SessionHolder.retryRollbackingLock(); if (lock) { try { handleRetryRollbacking(); } catch (Exception e) { LOGGER.info("Exception retry rollbacking ... ", e); } finally { SessionHolder.unRetryRollbackingLock(); } } }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 重试commit定时任务 retryCommitting.scheduleAtFixedRate(() -> { boolean lock = SessionHolder.retryCommittingLock(); if (lock) { try { handleRetryCommitting(); } catch (Exception e) { LOGGER.info("Exception retry committing ... ", e); } finally { SessionHolder.unRetryCommittingLock(); } } }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); // 省略部分代码}

全局锁

在前面的文章中我们说过,在Seata AT模式中,我们用全局锁来避免脏写,同时也可以用全局锁将默认的隔离级别从读未提交升高为读已提交。

全局锁是存在TC端的,所以在TC端要提供相应的接口,来进行加锁,解锁,相应的资源是否被加锁等操作!

当分支事务提交的时候,需要把修改的资源锁定。当全局事务提交后才会把相应的资源解锁。

各种事务的加锁状态会存在lock_table表中

-- the table to store lock dataCREATE TABLE IF NOT EXISTS `lock_table`( `row_key` VARCHAR(128) NOT NULL, -- resource_id+table_name+pk的组合 `xid` VARCHAR(128), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;

用个例子演示一下全局锁的工作流程,

假如account_info表中有如下2个账户数据,其中id是主键

id

user_id

1

1001

2

1002

假如说执行如下sql

update account_info set balance = 100

当进行commit的时候,需要加全局锁,此时根据update语句构建出select语句,找出影响的主键,即1和2

通过主键构建出lockKey=account_info:1,2

lockKey的构建规则如下

如果主键是一列(id列),则形式如下

// 表名:主键值1,主键值2account_info:1,2

如果主键是多列(id列+user_id列),则形式如下

// 表名:主键值1_主键值a,主键值1_主键值baccount_info:1_1001,2_1002

当然有可能一个事务中,有可能有多个表,多个表的构建规则如下(中间通过;分隔即可)

// 表名1:主键值1,主键值2;表名2:主键值1,主键值2account_flow:1,2;account_info:1,2

加锁

当加锁是会往lock_table中假如如下2个记录(省略无关的列)

row_key

xid

jdbc:mysql://myhost:3306/db_account_1^^^account_info^^^1

127.21.0.14:18091:6449339005964652705

jdbc:mysql://myhost:3306/db_account_2^^^account_info^^^1

127.21.0.14:18091:6449339005964652705

row_key的构建规则如下

rowKey = resourceId + "^^^" + tableName + "^^^" +

tcc模式下resourceId为@TwoPhaseBusinessAction注解的name属性 而在at和xa模式中resourceId都为数据库连接url

解锁

根据xid和row_key删除记录

查询是否能加锁

根据row_key从lock_table中查询记录,如果记录中的xid和查询传递过来的xid不一致则加锁失败,如果没有记录或者记录的xid和传递过来的xid一致,则加锁成功

源码分析

TC端定义的锁操作接口

public interface LockManager { // 对分支事务的资源加锁 boolean acquireLock(BranchSession branchSession) throws TransactionException; // 对分支事务的资源解锁 boolean releaseLock(BranchSession branchSession) throws TransactionException; // 对全局事务中的所有分支事务的资源解锁 boolean releaseGlobalSessionLock(GlobalSession globalSession) throws TransactionException; // 根据xid resourceId lockKey 查询是否已经加锁 boolean isLockable(String xid, String resourceId, String lockKey) throws TransactionException; // 清除所有锁 void cleanAllLocks() throws TransactionException;}

老规矩,我们还是只分析db这种存储模式,最终的加解锁操作会交给Locker

加锁

// AbstractLockManagerpublic boolean acquireLock(BranchSession branchSession) throws TransactionException { if (branchSession == null) { throw new IllegalArgumentException("branchSession can't be null for memory/file locker."); } String lockKey = branchSession.getLockKey(); if (StringUtils.isNullOrEmpty(lockKey)) { // no lock return true; } // get locks of branch // 创建 RowLock 集合,一条加锁记录对应一个RowLock对象 List locks = collectRowLocks(branchSession); if (CollectionUtils.isEmpty(locks)) { // no lock return true; } return getLocker(branchSession).acquireLock(locks);}

把需要加锁的资源转换成RowLock集合

protected List collectRowLocks(BranchSession branchSession) { List locks = new ArrayList<>(); if (branchSession == null || StringUtils.isBlank(branchSession.getLockKey())) { return locks; } String xid = branchSession.getXid(); // 得到资源id,也就是数据库连接url String resourceId = branchSession.getResourceId(); long transactionId = branchSession.getTransactionId(); String lockKey = branchSession.getLockKey(); return collectRowLocks(lockKey, resourceId, xid, transactionId, branchSession.getBranchId());}

转换RowLock的过程,基本就是对lockKey的解析过程,

protected List collectRowLocks(String lockKey, String resourceId, String xid, Long transactionId, Long branchID) { List locks = new ArrayList(); // 对多个记录加锁,中间使用;分隔 String[] tableGroupedLockKeys = lockKey.split(";"); for (String tableGroupedLockKey : tableGroupedLockKeys) { // 表名和记录主键值之间用:分隔 int idx = tableGroupedLockKey.indexOf(":"); if (idx < 0) { return locks; } // 要加锁的表名 String tableName = tableGroupedLockKey.substring(0, idx); // 加锁的记录主键值,如果需要一次加锁多条记录,记录之间用,分隔 String mergedPKs = tableGroupedLockKey.substring(idx + 1); if (StringUtils.isBlank(mergedPKs)) { return locks; } String[] pks = mergedPKs.split(","); if (pks == null || pks.length == 0) { return locks; } // 一个主键创建一个RowLock对象 for (String pk : pks) { if (StringUtils.isNotBlank(pk)) { RowLock rowLock = new RowLock(); rowLock.setXid(xid); rowLock.setTransactionId(transactionId); rowLock.setBranchId(branchID); rowLock.setTableName(tableName); rowLock.setPk(pk); rowLock.setResourceId(resourceId); locks.add(rowLock); } } } return locks;}

构造好RowLock集合,接下来就是调用DataBaseLocker执行真正的加锁操作

首先将RowLock转为LockDO

protected List convertToLockDO(List locks) { List lockDOs = new ArrayList<>(); if (CollectionUtils.isEmpty(locks)) { return lockDOs; } for (RowLock rowLock : locks) { LockDO lockDO = new LockDO(); lockDO.setBranchId(rowLock.getBranchId()); lockDO.setPk(rowLock.getPk()); lockDO.setResourceId(rowLock.getResourceId()); // rowKey = resourceId + "^^^" + tableName + "^^^" + pk lockDO.setRowKey(getRowKey(rowLock.getResourceId(), rowLock.getTableName(), rowLock.getPk())); lockDO.setXid(rowLock.getXid()); lockDO.setTransactionId(rowLock.getTransactionId()); lockDO.setTableName(rowLock.getTableName()); lockDOs.add(lockDO); } return lockDOs;}

// LockStoreDataBaseDAOpublic boolean acquireLock(List lockDOs) { Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; Set dbExistedRowKeys = new HashSet<>(); boolean originalAutoCommit = true; if (lockDOs.size() > 1) { // 过滤掉重复的加锁记录 lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList()); } try { conn = lockStoreDataSource.getConnection(); if (originalAutoCommit = conn.getAutoCommit()) { conn.setAutoCommit(false); } //check lock StringJoiner sj = new StringJoiner(","); for (int i = 0; i < lockDOs.size(); i++) { sj.add("?"); } boolean canLock = true; //query // select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified // from lock_table where row_key in ('?', '?', '?') String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, sj.toString()); ps = conn.prepareStatement(checkLockSQL); for (int i = 0; i < lockDOs.size(); i++) { ps.setString(i + 1, lockDOs.get(i).getRowKey()); } rs = ps.executeQuery(); String currentXID = lockDOs.get(0).getXid(); while (rs.next()) { String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID); // 查出来的记录的row_key和当前的不一样,则说明被别的事务加锁了 if (!StringUtils.equals(dbXID, currentXID)) { if (LOGGER.isInfoEnabled()) { String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK); String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME); Long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID); LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId); } canLock &= false; break; } // 已经被自己加锁的记录 dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY)); } // 有记录已经加过锁,回滚退出 if (!canLock) { conn.rollback(); return false; } // 此次需要加锁的记录 List unrepeatedLockDOs = null; if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) { unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())) .collect(Collectors.toList()); } else { unrepeatedLockDOs = lockDOs; } if (CollectionUtils.isEmpty(unrepeatedLockDOs)) { conn.rollback(); return true; } //lock // 执行加锁操作 if (unrepeatedLockDOs.size() == 1) { LockDO lockDO = unrepeatedLockDOs.get(0); if (!doAcquireLock(conn, lockDO)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk()); } conn.rollback(); return false; } } else { if (!doAcquireLocks(conn, unrepeatedLockDOs)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(), unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList())); } conn.rollback(); return false; } } conn.commit(); return true; } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(rs, ps); if (conn != null) { try { if (originalAutoCommit) { conn.setAutoCommit(true); } conn.close(); } catch (SQLException e) { } } }}

解锁

解锁的逻辑和加锁的逻辑差不多,直接分析最终执行的部分了

public boolean unLock(List lockDOs) { Connection conn = null; PreparedStatement ps = null; try { conn = lockStoreDataSource.getConnection(); conn.setAutoCommit(true); StringJoiner sj = new StringJoiner(","); for (int i = 0; i < lockDOs.size(); i++) { sj.add("?"); } //batch release lock // delete from lock_table where xid = ? and row_key in (?, ?, ?) String batchDeleteSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getBatchDeleteLockSql(lockTable, sj.toString()); ps = conn.prepareStatement(batchDeleteSQL); ps.setString(1, lockDOs.get(0).getXid()); for (int i = 0; i < lockDOs.size(); i++) { ps.setString(i + 2, lockDOs.get(i).getRowKey()); } ps.executeUpdate(); } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); } return true;}

参考博客

事务状态逻辑 [1]http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:学PHP必知PHP岗位面试题(php面试试题)
下一篇:seata源码解析:seata AT模式是如何实现的?
相关文章