AT 分布式事务使用详解
一、AT 模式概述
AT (Automatic Transaction) 模式是 Seata 中最常用的分布式事务模式,它通过自动生成反向 SQL 来实现事务的回滚,对业务代码侵入性极小。
二、核心配置详解
1. 全局配置 (registry.conf)
# 注册中心配置
registry {
type = "nacos" # 支持 nacos、eureka、redis、zk、consul、etcd3、sofa
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
}
# 配置中心
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seata-server.properties"
}
}2. 客户端配置 (application.yml)
seata:
enabled: true
application-id: order-service # 应用唯一标识
tx-service-group: my_test_tx_group # 事务组名称
# 自动数据源代理
enable-auto-data-source-proxy: true
data-source-proxy-mode: AT # AT 模式
# 客户端配置
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace: ""
data-id: seata-client.properties
# 注册中心配置
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace: ""
cluster: default
# 客户端详细配置
client:
rm:
# 异步提交缓存队列长度
async-commit-buffer-limit: 10000
# 一阶段结果上报 TC 重试次数
report-retry-count: 5
# 自动刷新缓存中的表结构
table-meta-check-enable: true
# 分支事务与其它全局回滚事务冲突时锁策略
report-success-enable: false
# 是否上报一阶段成功
saga-branch-register-enable: false
# saga json parser
saga-json-parser: fastjson
# 一阶段全局提交结果上报 TC 重试次数
saga-retry-persist-mode-update: false
# 默认false,ture会提升性能
saga-compensate-persist-mode-update: false
# TCC 资源自动清理时间(小时)
tcc-action-interceptor-order: -2147482648
tm:
# 一阶段全局提交结果上报 TC 重试次数
commit-retry-count: 5
# 一阶段全局回滚结果上报 TC 重试次数
rollback-retry-count: 5
# 默认全局事务超时时间(毫秒)
default-global-transaction-timeout: 60000
# 降级开关,默认 false
degrade-check: false
# 服务自检周期(毫秒)
degrade-check-period: 2000
# 允许降级检查的最小业务并发数
degrade-check-allow-times: 10
# 自检失败后开启降级的持续时间(毫秒)
interceptor-order: -2147482648
undo:
# 是否开启二阶段回滚镜像校验
data-validation: true
# 二阶段回滚镜像校验失败的处理方式
log-serialization: jackson
# undo 序列化方式:jackson、fastjson、kryo
log-table: undo_log
# 自定义 undo 表名
only-care-update-columns: true
# 是否只生成被更新字段的镜像
compress:
enable: true
# 是否压缩 undo_log
type: zip
# 压缩类型
threshold: 64k
# 压缩阈值
# 负载均衡配置
load-balance:
type: RandomLoadBalance # 负载均衡类型
virtual-nodes: 10 # 虚拟节点数3. 服务端配置 (Seata Server)
# 存储模式
store.mode=db # file、db、redis
# 数据库存储配置
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useSSL=false
store.db.user=root
store.db.password=root
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
# 事务、日志存储配置
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000三、使用示例
1. 数据库准备
每个业务库都需要创建 undo_log 表:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;2. 业务代码
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountService accountService;
@Autowired
private StorageService storageService;
/**
* 全局事务发起者,使用 @GlobalTransactional 注解
*/
@Override
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);
// 2. 扣减库存(远程调用)
storageService.deduct(order.getProductId(), order.getCount());
// 3. 扣减账户余额(远程调用)
accountService.debit(order.getUserId(), order.getMoney());
// 4. 更新订单状态
order.setStatus(1);
orderMapper.update(order);
}
}
// 库存服务
@Service
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Override
public void deduct(Long productId, Integer count) {
// 直接执行业务逻辑,无需额外事务注解
storageMapper.deduct(productId, count);
}
}四、底层实现原理
1. 核心组件架构
TC (Transaction Coordinator) - 事务协调器
├── 全局事务管理
├── 分支事务管理
└── 全局锁管理
TM (Transaction Manager) - 事务管理器
├── 全局事务开启
├── 全局事务提交
└── 全局事务回滚
RM (Resource Manager) - 资源管理器
├── 分支事务注册
├── 分支事务上报
└── 分支事务提交/回滚2. AT 模式执行流程
第一阶段(执行业务 SQL)
// DataSourceProxy 代理数据源
public class DataSourceProxy extends AbstractDataSourceProxy {
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
}
// ConnectionProxy 核心逻辑
public class ConnectionProxy extends AbstractConnectionProxy {
@Override
public void commit() throws SQLException {
try {
// 注册分支事务
register();
} catch (TransactionException e) {
// 识别并处理异常
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// 执行本地事务提交
targetConnection.commit();
} catch (Throwable ex) {
// 上报事务执行失败
report(false);
throw ex;
}
// 上报事务执行成功
report(true);
}
}
// ExecuteTemplate 执行模板
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(
SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
// 1. 前置镜像:查询修改前的数据
TableRecords beforeImage = buildBeforeImage(statementProxy, sqlRecognizer);
// 2. 执行业务 SQL
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
// 3. 后置镜像:查询修改后的数据
TableRecords afterImage = buildAfterImage(statementProxy, sqlRecognizer);
// 4. 构造 undo_log
prepareUndoLog(beforeImage, afterImage);
return result;
}
}核心数据结构
// 前置镜像和后置镜像数据结构
public class TableRecords {
private TableMeta tableMeta;
private List<Row> rows;
// 行数据
public static class Row {
private List<Field> fields;
public static class Field {
private String name; // 字段名
private int keyType; // 主键类型
private Object value; // 字段值
}
}
}
// Undo Log 结构
public class BranchUndoLog {
private String xid; // 全局事务ID
private long branchId; // 分支事务ID
private List<SQLUndoLog> sqlUndoLogs; // SQL回滚日志
public static class SQLUndoLog {
private String sqlType; // INSERT/UPDATE/DELETE
private String tableName; // 表名
private TableRecords beforeImage; // 前置镜像
private TableRecords afterImage; // 后置镜像
}
}第二阶段(提交或回滚)
提交流程:
public class AsyncWorker implements ResourceManagerInbound {
/**
* 异步提交分支事务
*/
public BranchStatus branchCommit(String xid, long branchId,
String resourceId) {
// AT 模式下,一阶段已经提交,二阶段只需删除 undo_log
return asyncCommit(xid, branchId, resourceId);
}
private BranchStatus asyncCommit(String xid, long branchId,
String resourceId) {
// 加入异步删除队列
addToCommitQueue(xid, branchId, resourceId);
return BranchStatus.PhaseTwo_Committed;
}
// 异步删除 undo_log
private void deleteUndoLog(String xid, long branchId) {
String sql = "DELETE FROM undo_log WHERE xid = ? AND branch_id = ?";
executeUpdate(sql, xid, branchId);
}
}回滚流程:
public class UndoLogManager {
/**
* 回滚分支事务
*/
public void undo(DataSourceProxy dataSourceProxy, String xid,
long branchId) throws SQLException {
Connection conn = dataSourceProxy.getConnection();
try {
// 1. 查询 undo_log
String selectSQL = "SELECT * FROM undo_log WHERE xid = ? " +
"AND branch_id = ? FOR UPDATE";
BranchUndoLog branchUndoLog = selectUndoLog(conn, xid, branchId);
if (branchUndoLog == null) {
return; // 已经回滚或提交
}
// 2. 数据校验
if (!dataValidation(conn, branchUndoLog)) {
throw new SQLException("Data validation failed");
}
// 3. 生成反向 SQL 并执行
for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
AbstractUndoExecutor undoExecutor =
UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
// 4. 删除 undo_log
String deleteSQL = "DELETE FROM undo_log WHERE xid = ? " +
"AND branch_id = ?";
executeUpdate(conn, deleteSQL, xid, branchId);
conn.commit();
} catch (Exception e) {
conn.rollback();
throw e;
}
}
/**
* 数据校验:比对当前数据和后置镜像
*/
private boolean dataValidation(Connection conn,
BranchUndoLog branchUndoLog) {
for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
TableRecords afterImage = sqlUndoLog.getAfterImage();
TableRecords currentRecords = queryCurrentRecords(conn, afterImage);
// 比对数据是否一致
if (!afterImage.equals(currentRecords)) {
return false; // 数据被脏写
}
}
return true;
}
}反向 SQL 生成逻辑
// UPDATE 反向 SQL
public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
// 根据前置镜像生成 UPDATE 语句
StringBuilder sql = new StringBuilder("UPDATE ");
sql.append(sqlUndoLog.getTableName()).append(" SET ");
// 设置字段值为前置镜像的值
List<Field> fields = beforeImage.getRows().get(0).getFields();
for (int i = 0; i < fields.size(); i++) {
if (i > 0) sql.append(", ");
sql.append(fields.get(i).getName()).append(" = ?");
}
// WHERE 条件(主键)
sql.append(" WHERE ");
appendWhereCondition(sql, beforeImage);
return sql.toString();
}
}
// DELETE 反向 SQL(生成 INSERT)
public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
// 根据前置镜像生成 INSERT 语句
StringBuilder sql = new StringBuilder("INSERT INTO ");
sql.append(sqlUndoLog.getTableName()).append(" (");
// 字段列表
List<Field> fields = beforeImage.getRows().get(0).getFields();
for (int i = 0; i < fields.size(); i++) {
if (i > 0) sql.append(", ");
sql.append(fields.get(i).getName());
}
sql.append(") VALUES (");
for (int i = 0; i < fields.size(); i++) {
if (i > 0) sql.append(", ");
sql.append("?");
}
sql.append(")");
return sql.toString();
}
}
// INSERT 反向 SQL(生成 DELETE)
public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords afterImage = sqlUndoLog.getAfterImage();
// 根据后置镜像生成 DELETE 语句
StringBuilder sql = new StringBuilder("DELETE FROM ");
sql.append(sqlUndoLog.getTableName());
sql.append(" WHERE ");
appendWhereCondition(sql, afterImage);
return sql.toString();
}
}3. 全局锁机制
public class LockManagerImpl implements LockManager {
/**
* 获取全局锁
*/
public boolean acquireLock(List<RowLock> rowLocks) {
// 锁的粒度:表名 + 主键值
// 例如:order_table:1,2,3
String lockKey = buildLockKey(rowLocks);
// 向 TC 申请全局锁
boolean result = transactionCoordinator.acquireLock(
xid, branchId, resourceId, lockKey);
if (!result) {
// 获取锁失败,进入重试逻辑
return retryAcquireLock(rowLocks);
}
return true;
}
private String buildLockKey(List<RowLock> rowLocks) {
// 格式:table1:pk1,pk2;table2:pk3,pk4
Map<String, Set<String>> lockMap = new HashMap<>();
for (RowLock rowLock : rowLocks) {
lockMap.computeIfAbsent(
rowLock.getTableName(),
k -> new HashSet<>()
).add(rowLock.getPk());
}
return lockMap.entrySet().stream()
.map(e -> e.getKey() + ":" + String.join(",", e.getValue()))
.collect(Collectors.joining(";"));
}
}五、性能优化配置
seata:
client:
rm:
# 异步提交优化
async-commit-buffer-limit: 10000
undo:
# 只记录更新字段
only-care-update-columns: true
# 压缩 undo_log
compress:
enable: true
type: zip
threshold: 64k六、常见问题
- 脏写问题:通过全局锁和数据校验解决
- 性能问题:使用异步提交、undo_log 压缩
- undo_log 膨胀:定期清理历史数据
-- 清理 7 天前的 undo_log
DELETE FROM undo_log
WHERE log_created < DATE_SUB(NOW(), INTERVAL 7 DAY);AT 模式通过自动生成前后镜像和反向 SQL,实现了对业务代码零侵入的分布式事务解决方案,是 Seata 最推荐使用的模式。