万籁俱寂,万字将成。
刘耀文
Stay hungry. Stay foolish.
© 2024-2026
Powered by Mix Space&
余白 / Yohaku
.
正在被0人看爆
关于
关于本站关于我
更多
时间线友链
联系
写留言发邮件 ↗
刘耀文
Stay hungry. Stay foolish.
链接
关于本站·关于我·时间线·友链·写留言·发邮件
© 2024-2026 Powered by Mix Space&
余白 / Yohaku
.
正在被0人看爆
赣ICP备2024031666号
RSS 订阅·站点地图·
··|
RSS 订阅·站点地图·|··|赣ICP备2024031666号
稍候片刻,月出文自明。

AT 分布式事务使用详解

7
AI·GEN

关键洞察

AT 分布式事务使用详解

  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • Loading...
  • 一、AT 模式概述

    AT (Automatic Transaction) 模式是 Seata 中最常用的分布式事务模式,它通过自动生成反向 SQL 来实现事务的回滚,对业务代码侵入性极小。

    二、核心配置详解

    1. 全局配置 (registry.conf)

    PROPERTIES
    # 注册中心配置
    registry {
      type = "nacos"  # 支持 nacos、eureka、redis、zk、consul、etcd3、sofa
      
      nacos {
        application = "seata-server"
        serverAddr = "127.0.0.1:8848"
        group = "SEATA_GROUP"
        namespace = ""
        cluster = "default"
        username = "nacos"
        password = "nacos"
      }
    }
    
    # 配置中心
    config {
      type = "nacos"
      
      nacos {
        serverAddr = "127.0.0.1:8848"
        namespace = ""
        group = "SEATA_GROUP"
        username = "nacos"
        password = "nacos"
        dataId = "seata-server.properties"
      }
    }
    

    2. 客户端配置 (application.yml)

    YAML
    seata:
      enabled: true
      application-id: order-service  # 应用唯一标识
      tx-service-group: my_test_tx_group  # 事务组名称
      
      # 自动数据源代理
      enable-auto-data-source-proxy: true
      data-source-proxy-mode: AT  # AT 模式
      
      # 客户端配置
      config:
        type: nacos
        nacos:
          server-addr: 127.0.0.1:8848
          group: SEATA_GROUP
          namespace: ""
          data-id: seata-client.properties
      
      # 注册中心配置
      registry:
        type: nacos
        nacos:
          application: seata-server
          server-addr: 127.0.0.1:8848
          group: SEATA_GROUP
          namespace: ""
          cluster: default
      
      # 客户端详细配置
      client:
        rm:
          # 异步提交缓存队列长度
          async-commit-buffer-limit: 10000
          # 一阶段结果上报 TC 重试次数
          report-retry-count: 5
          # 自动刷新缓存中的表结构
          table-meta-check-enable: true
          # 分支事务与其它全局回滚事务冲突时锁策略
          report-success-enable: false
          # 是否上报一阶段成功
          saga-branch-register-enable: false
          # saga json parser
          saga-json-parser: fastjson
          # 一阶段全局提交结果上报 TC 重试次数
          saga-retry-persist-mode-update: false
          # 默认false,ture会提升性能
          saga-compensate-persist-mode-update: false
          # TCC 资源自动清理时间(小时)
          tcc-action-interceptor-order: -2147482648
          
        tm:
          # 一阶段全局提交结果上报 TC 重试次数
          commit-retry-count: 5
          # 一阶段全局回滚结果上报 TC 重试次数
          rollback-retry-count: 5
          # 默认全局事务超时时间(毫秒)
          default-global-transaction-timeout: 60000
          # 降级开关,默认 false
          degrade-check: false
          # 服务自检周期(毫秒)
          degrade-check-period: 2000
          # 允许降级检查的最小业务并发数
          degrade-check-allow-times: 10
          # 自检失败后开启降级的持续时间(毫秒)
          interceptor-order: -2147482648
          
        undo:
          # 是否开启二阶段回滚镜像校验
          data-validation: true
          # 二阶段回滚镜像校验失败的处理方式
          log-serialization: jackson
          # undo 序列化方式:jackson、fastjson、kryo
          log-table: undo_log
          # 自定义 undo 表名
          only-care-update-columns: true
          # 是否只生成被更新字段的镜像
          compress:
            enable: true
            # 是否压缩 undo_log
            type: zip
            # 压缩类型
            threshold: 64k
            # 压缩阈值
        
        # 负载均衡配置
        load-balance:
          type: RandomLoadBalance  # 负载均衡类型
          virtual-nodes: 10  # 虚拟节点数
    

    3. 服务端配置 (Seata Server)

    PROPERTIES
    # 存储模式
    store.mode=db  # file、db、redis
    
    # 数据库存储配置
    store.db.datasource=druid
    store.db.dbType=mysql
    store.db.driverClassName=com.mysql.cj.jdbc.Driver
    store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useSSL=false
    store.db.user=root
    store.db.password=root
    store.db.minConn=5
    store.db.maxConn=30
    store.db.globalTable=global_table
    store.db.branchTable=branch_table
    store.db.queryLimit=100
    store.db.lockTable=lock_table
    store.db.maxWait=5000
    
    # 事务、日志存储配置
    server.recovery.committingRetryPeriod=1000
    server.recovery.asynCommittingRetryPeriod=1000
    server.recovery.rollbackingRetryPeriod=1000
    server.recovery.timeoutRetryPeriod=1000
    

    三、使用示例

    1. 数据库准备

    每个业务库都需要创建 undo_log 表:

    CodeBlock Loading...

    2. 业务代码

    CodeBlock Loading...

    四、底层实现原理

    1. 核心组件架构

    TC (Transaction Coordinator) - 事务协调器
    ├── 全局事务管理
    ├── 分支事务管理
    └── 全局锁管理
    
    TM (Transaction Manager) - 事务管理器
    ├── 全局事务开启
    ├── 全局事务提交
    └── 全局事务回滚
    
    RM (Resource Manager) - 资源管理器
    ├── 分支事务注册
    ├── 分支事务上报
    └── 分支事务提交/回滚
    

    2. AT 模式执行流程

    第一阶段(执行业务 SQL)

    CodeBlock Loading...

    核心数据结构

    CodeBlock Loading...

    第二阶段(提交或回滚)

    提交流程:

    CodeBlock Loading...

    回滚流程:

    CodeBlock Loading...

    反向 SQL 生成逻辑

    CodeBlock Loading...

    3. 全局锁机制

    CodeBlock Loading...

    五、性能优化配置

    CodeBlock Loading...

    六、常见问题

    1. 脏写问题:通过全局锁和数据校验解决
    2. 性能问题:使用异步提交、undo_log 压缩
    3. undo_log 膨胀:定期清理历史数据
    CodeBlock Loading...

    AT 模式通过自动生成前后镜像和反向 SQL,实现了对业务代码零侵入的分布式事务解决方案,是 Seata 最推荐使用的模式。​​​​​​​​​​​​​​​​

    SQL
    CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    JAVA
    @Service
    public class OrderServiceImpl implements OrderService {
        
        @Autowired
        private OrderMapper orderMapper;
        
        @Autowired
        private AccountService accountService;
        
        @Autowired
        private StorageService storageService;
        
        /**
         * 全局事务发起者,使用 @GlobalTransactional 注解
         */
        @Override
        @GlobalTransactional(name = "create-order", rollbackFor = Exception.class)
        public void createOrder(Order order) {
            // 1. 创建订单
            orderMapper.insert(order);
            
            // 2. 扣减库存(远程调用)
            storageService.deduct(order.getProductId(), order.getCount());
            
            // 3. 扣减账户余额(远程调用)
            accountService.debit(order.getUserId(), order.getMoney());
            
            // 4. 更新订单状态
            order.setStatus(1);
            orderMapper.update(order);
        }
    }
    
    // 库存服务
    @Service
    public class StorageServiceImpl implements StorageService {
        
        @Autowired
        private StorageMapper storageMapper;
        
        @Override
        public void deduct(Long productId, Integer count) {
            // 直接执行业务逻辑,无需额外事务注解
            storageMapper.deduct(productId, count);
        }
    }
    
    JAVA
    // DataSourceProxy 代理数据源
    public class DataSourceProxy extends AbstractDataSourceProxy {
        
        @Override
        public ConnectionProxy getConnection() throws SQLException {
            Connection targetConnection = targetDataSource.getConnection();
            return new ConnectionProxy(this, targetConnection);
        }
    }
    
    // ConnectionProxy 核心逻辑
    public class ConnectionProxy extends AbstractConnectionProxy {
        
        @Override
        public void commit() throws SQLException {
            try {
                // 注册分支事务
                register();
            } catch (TransactionException e) {
                // 识别并处理异常
                recognizeLockKeyConflictException(e, context.buildLockKeys());
            }
            
            try {
                // 执行本地事务提交
                targetConnection.commit();
            } catch (Throwable ex) {
                // 上报事务执行失败
                report(false);
                throw ex;
            }
            
            // 上报事务执行成功
            report(true);
        }
    }
    
    // ExecuteTemplate 执行模板
    public class ExecuteTemplate {
        
        public static <T, S extends Statement> T execute(
                SQLRecognizer sqlRecognizer,
                StatementProxy<S> statementProxy,
                StatementCallback<T, S> statementCallback,
                Object... args) throws SQLException {
            
            // 1. 前置镜像:查询修改前的数据
            TableRecords beforeImage = buildBeforeImage(statementProxy, sqlRecognizer);
            
            // 2. 执行业务 SQL
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            
            // 3. 后置镜像:查询修改后的数据
            TableRecords afterImage = buildAfterImage(statementProxy, sqlRecognizer);
            
            // 4. 构造 undo_log
            prepareUndoLog(beforeImage, afterImage);
            
            return result;
        }
    }
    
    JAVA
    // 前置镜像和后置镜像数据结构
    public class TableRecords {
        private TableMeta tableMeta;
        private List<Row> rows;
        
        // 行数据
        public static class Row {
            private List<Field> fields;
            
            public static class Field {
                private String name;        // 字段名
                private int keyType;        // 主键类型
                private Object value;       // 字段值
            }
        }
    }
    
    // Undo Log 结构
    public class BranchUndoLog {
        private String xid;                    // 全局事务ID
        private long branchId;                 // 分支事务ID
        private List<SQLUndoLog> sqlUndoLogs;  // SQL回滚日志
        
        public static class SQLUndoLog {
            private String sqlType;            // INSERT/UPDATE/DELETE
            private String tableName;          // 表名
            private TableRecords beforeImage;  // 前置镜像
            private TableRecords afterImage;   // 后置镜像
        }
    }
    
    JAVA
    public class AsyncWorker implements ResourceManagerInbound {
        
        /**
         * 异步提交分支事务
         */
        public BranchStatus branchCommit(String xid, long branchId, 
                                         String resourceId) {
            // AT 模式下,一阶段已经提交,二阶段只需删除 undo_log
            return asyncCommit(xid, branchId, resourceId);
        }
        
        private BranchStatus asyncCommit(String xid, long branchId, 
                                         String resourceId) {
            // 加入异步删除队列
            addToCommitQueue(xid, branchId, resourceId);
            return BranchStatus.PhaseTwo_Committed;
        }
        
        // 异步删除 undo_log
        private void deleteUndoLog(String xid, long branchId) {
            String sql = "DELETE FROM undo_log WHERE xid = ? AND branch_id = ?";
            executeUpdate(sql, xid, branchId);
        }
    }
    
    JAVA
    public class UndoLogManager {
        
        /**
         * 回滚分支事务
         */
        public void undo(DataSourceProxy dataSourceProxy, String xid, 
                         long branchId) throws SQLException {
            
            Connection conn = dataSourceProxy.getConnection();
            
            try {
                // 1. 查询 undo_log
                String selectSQL = "SELECT * FROM undo_log WHERE xid = ? " +
                                 "AND branch_id = ? FOR UPDATE";
                BranchUndoLog branchUndoLog = selectUndoLog(conn, xid, branchId);
                
                if (branchUndoLog == null) {
                    return;  // 已经回滚或提交
                }
                
                // 2. 数据校验
                if (!dataValidation(conn, branchUndoLog)) {
                    throw new SQLException("Data validation failed");
                }
                
                // 3. 生成反向 SQL 并执行
                for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                    AbstractUndoExecutor undoExecutor = 
                        UndoExecutorFactory.getUndoExecutor(
                            dataSourceProxy.getDbType(), sqlUndoLog);
                    undoExecutor.executeOn(conn);
                }
                
                // 4. 删除 undo_log
                String deleteSQL = "DELETE FROM undo_log WHERE xid = ? " +
                                 "AND branch_id = ?";
                executeUpdate(conn, deleteSQL, xid, branchId);
                
                conn.commit();
                
            } catch (Exception e) {
                conn.rollback();
                throw e;
            }
        }
        
        /**
         * 数据校验:比对当前数据和后置镜像
         */
        private boolean dataValidation(Connection conn, 
                                       BranchUndoLog branchUndoLog) {
            for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                TableRecords afterImage = sqlUndoLog.getAfterImage();
                TableRecords currentRecords = queryCurrentRecords(conn, afterImage);
                
                // 比对数据是否一致
                if (!afterImage.equals(currentRecords)) {
                    return false;  // 数据被脏写
                }
            }
            return true;
        }
    }
    
    JAVA
    // UPDATE 反向 SQL
    public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor {
        
        @Override
        protected String buildUndoSQL() {
            TableRecords beforeImage = sqlUndoLog.getBeforeImage();
            
            // 根据前置镜像生成 UPDATE 语句
            StringBuilder sql = new StringBuilder("UPDATE ");
            sql.append(sqlUndoLog.getTableName()).append(" SET ");
            
            // 设置字段值为前置镜像的值
            List<Field> fields = beforeImage.getRows().get(0).getFields();
            for (int i = 0; i < fields.size(); i++) {
                if (i > 0) sql.append(", ");
                sql.append(fields.get(i).getName()).append(" = ?");
            }
            
            // WHERE 条件(主键)
            sql.append(" WHERE ");
            appendWhereCondition(sql, beforeImage);
            
            return sql.toString();
        }
    }
    
    // DELETE 反向 SQL(生成 INSERT)
    public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor {
        
        @Override
        protected String buildUndoSQL() {
            TableRecords beforeImage = sqlUndoLog.getBeforeImage();
            
            // 根据前置镜像生成 INSERT 语句
            StringBuilder sql = new StringBuilder("INSERT INTO ");
            sql.append(sqlUndoLog.getTableName()).append(" (");
            
            // 字段列表
            List<Field> fields = beforeImage.getRows().get(0).getFields();
            for (int i = 0; i < fields.size(); i++) {
                if (i > 0) sql.append(", ");
                sql.append(fields.get(i).getName());
            }
            
            sql.append(") VALUES (");
            for (int i = 0; i < fields.size(); i++) {
                if (i > 0) sql.append(", ");
                sql.append("?");
            }
            sql.append(")");
            
            return sql.toString();
        }
    }
    
    // INSERT 反向 SQL(生成 DELETE)
    public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {
        
        @Override
        protected String buildUndoSQL() {
            TableRecords afterImage = sqlUndoLog.getAfterImage();
            
            // 根据后置镜像生成 DELETE 语句
            StringBuilder sql = new StringBuilder("DELETE FROM ");
            sql.append(sqlUndoLog.getTableName());
            sql.append(" WHERE ");
            
            appendWhereCondition(sql, afterImage);
            
            return sql.toString();
        }
    }
    
    JAVA
    public class LockManagerImpl implements LockManager {
        
        /**
         * 获取全局锁
         */
        public boolean acquireLock(List<RowLock> rowLocks) {
            // 锁的粒度:表名 + 主键值
            // 例如:order_table:1,2,3
            String lockKey = buildLockKey(rowLocks);
            
            // 向 TC 申请全局锁
            boolean result = transactionCoordinator.acquireLock(
                xid, branchId, resourceId, lockKey);
            
            if (!result) {
                // 获取锁失败,进入重试逻辑
                return retryAcquireLock(rowLocks);
            }
            
            return true;
        }
        
        private String buildLockKey(List<RowLock> rowLocks) {
            // 格式:table1:pk1,pk2;table2:pk3,pk4
            Map<String, Set<String>> lockMap = new HashMap<>();
            
            for (RowLock rowLock : rowLocks) {
                lockMap.computeIfAbsent(
                    rowLock.getTableName(), 
                    k -> new HashSet<>()
                ).add(rowLock.getPk());
            }
            
            return lockMap.entrySet().stream()
                .map(e -> e.getKey() + ":" + String.join(",", e.getValue()))
                .collect(Collectors.joining(";"));
        }
    }
    
    YAML
    seata:
      client:
        rm:
          # 异步提交优化
          async-commit-buffer-limit: 10000
          
        undo:
          # 只记录更新字段
          only-care-update-columns: true
          
          # 压缩 undo_log
          compress:
            enable: true
            type: zip
            threshold: 64k
    
    SQL
    -- 清理 7 天前的 undo_log
    DELETE FROM undo_log 
    WHERE log_created < DATE_SUB(NOW(), INTERVAL 7 DAY);