深度解析 Seata RM (Resource Manager) 模块源码与机制

本文将带你系统梳理 Seata 分布式事务框架中 RM(Resource Manager)模块的核心实现,包括分支注册、状态上报、全局锁、undo log、并发与异常处理等关键机制,并配以流程图、类图和源码解读,助你彻底吃透 RM 的设计与落地细节。


一、RM 模块定位与职责

在 Seata 架构中,RM(Resource Manager,资源管理器)主要负责:

典型场景:AT 模式下,RM 代理业务 SQL,自动生成 undo log,拦截并注册分支事务,保障全局一致性。


二、核心流程全景图

1. 分支注册与状态上报

flowchart TD
    A[业务SQL执行 / ConnectionProxy] --> B[分支注册 branchRegister]
    B --> C[构建 BranchRegisterRequest]
    C --> D[RPC调用TC / RmNettyRemotingClient]
    D --> E[TC全局锁校验/分支注册]
    E --> F[返回分支ID]
    F --> G[生成UndoLog flushUndoLogs]
    G --> H[业务提交/回滚]
    H --> I[状态上报 branchReport]
    I --> J[RPC上报TC]
    J --> K[TC更新分支状态]

流程说明


三、关键机制源码解读

1. 分支注册调用链

核心数据结构

public class BranchRegisterRequest {
    private String xid;           // 全局事务ID
    private BranchType branchType; // 分支类型
    private String resourceId;    // 资源ID
    private String lockKey;       // 行锁键值
    private String applicationData; // 应用自定义数据
}

关键源码片段:分支注册

// AbstractResourceManager.java
@Override
public Long branchRegister(
        BranchType branchType,
        String resourceId,
        String clientId,
        String xid,
        String applicationData,
        String lockKeys) throws TransactionException {
    try {
        BranchRegisterRequest request = new BranchRegisterRequest();
        request.setXid(xid);
        request.setLockKey(lockKeys);
        request.setResourceId(resourceId);
        request.setBranchType(branchType);
        request.setApplicationData(applicationData);

        // Send RPC to TC
        BranchRegisterResponse response =
            (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new RmTransactionException(
                response.getTransactionExceptionCode(),
                String.format("branch register failed, xid: %s, errMsg: %s ", xid, response.getMsg()));
        }
        return response.getBranchId();
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "branch register timeout, xid:" + xid, toe);
    }
}

2. 状态上报机制

状态上报数据结构

public class BranchReportRequest {
    private String xid;
    private long branchId;
    private String resourceId;
    private BranchStatus status;
    private String applicationData;
    private BranchType branchType;
}

关键源码片段:状态上报

// AbstractResourceManager.java
@Override
public void branchReport(
        BranchType branchType, String xid, long branchId, BranchStatus status, String applicationData)
        throws TransactionException {
    try {
        BranchReportRequest request = new BranchReportRequest();
        request.setXid(xid);
        request.setBranchId(branchId);
        request.setStatus(status);
        request.setApplicationData(applicationData);

        BranchReportResponse response =
            (BranchReportResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new RmTransactionException(
                response.getTransactionExceptionCode(),
                String.format("branch report failed, xid: %s, errMsg: %s ", xid, response.getMsg()));
        }
    } catch (TimeoutException toe) {
        throw new RmTransactionException(TransactionExceptionCode.IO, "branch report timeout, xid:" + xid, toe);
    }
}

3. 全局锁管理

全局锁存储结构

关键源码片段:全局锁获取

// AbstractLockManager.java
@Override
public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock)
        throws TransactionException {
    if (branchSession == null) {
        throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
    }
    String lockKey = branchSession.getLockKey();
    if (StringUtils.isNullOrEmpty(lockKey)) {
        // no lock
        return true;
    }
    // get locks of branch
    List<RowLock> locks = collectRowLocks(branchSession);
    if (CollectionUtils.isEmpty(locks)) {
        // no lock
        return true;
    }
    return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
}

4. AT 模式 undo log

关键源码片段:undo log 生成

// ConnectionProxy.java
private void processGlobalTransactionCommit() throws SQLException {
    try {
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    context.reset();
}

5. 分支事务回滚

关键源码片段:undo log 回滚

// DataSourceManager.java
@Override
public BranchStatus branchRollback(
        BranchType branchType, String xid, long branchId, String resourceId, String applicationData)
        throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException(String.format("resource: %s not found", resourceId));
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);
        }
    } catch (TransactionException te) {
        StackTraceLogger.error(
                LOGGER,
                te,
                "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
                new Object[] {branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

四、并发与异常处理


五、类图结构

classDiagram
    class ResourceManager {
        <<interface>>
        +Long branchRegister(...)
        +void branchReport(...)
        +BranchStatus branchCommit(...)
        +BranchStatus branchRollback(...)
        +boolean lockQuery(...)
        +void registerResource(...)
        +void unregisterResource(...)
        +Map getManagedResources()
        +BranchType getBranchType()
        +GlobalStatus getGlobalStatus(...)
    }
    class AbstractResourceManager {
        +Long branchRegister(...)
        +void branchReport(...)
        +boolean lockQuery(...)
        +ResourceManager getResourceManager(...)
    }
    class DefaultResourceManager {
        +static Map resourceManagers
        +Long branchRegister(...)
        +void branchReport(...)
        +BranchStatus branchCommit(...)
        +BranchStatus branchRollback(...)
        +boolean lockQuery(...)
    }
    class DataSourceManager {
        +Map dataSourceCache
        +BranchStatus branchRollback(...)
        +BranchStatus branchCommit(...)
    }
    class UndoLogManager {
        <<interface>>
        +void flushUndoLogs(...)
        +void undo(...)
        +void deleteUndoLog(...)
    }
    class RmNettyRemotingClient {
        +Object sendSyncRequest(...)
    }
    ResourceManager <|-- AbstractResourceManager
    AbstractResourceManager <|-- DefaultResourceManager
    DefaultResourceManager o-- ResourceManager
    DefaultResourceManager o-- DataSourceManager
    DataSourceManager o-- UndoLogManager
    AbstractResourceManager o-- RmNettyRemotingClient

六、总结与实践建议


如果你觉得本文有帮助,欢迎点赞、收藏、关注!如需更细致的源码解读或有其他分布式事务相关问题,欢迎留言交流!