前端 JTA与TCC

part3 · December 17, 2019 · 35 hits

简介

随着互联网行业的发展,一个非常欣喜的现象就是,越来越多的用户开始愿意为优质内容付费,这就逐渐去除了创业公司只能靠广告盈利的魔咒,可以想见,越来越多的公司会在项目中加入支付系统。那么对于程序员来讲,了解支付系统一些技术关键点就非常有必要了。

上年技术分享会上,总监问我 “两阶段提交” 的含义,竟然忘光光,衰!

事务

本小节内容是 IBM developerworks 上两篇文章的简略版,并附上了一些自己的总结。

JTA 即 Java Transaction API(JTA)。我们平时对事务的了解止步于 ACID

  1. 原子性,可以由代码实现,比如 try catch
  2. 一致性,一致性的概念是基于特定于业务的,比如转账。
  3. 隔离性,意味着一个事务的效果不影响正在同时执行的其他事务。从事务的角度讲,它意味着事务按顺序执行而不是并行执行。在数据库系统中,通常通过使用锁机制来实现隔离性。为了使应用程序获得最佳性能,有时也会对某些事务放松隔离性的要求。
  4. 持久性,意味着一旦成功完成某个事务,对应用程序状态所做的更改将 “经得起失败”。这个就需要我们对事务进行记录,比如日志(undo/redo log 等)或数据库中。

实现 ACID 特性需要多个参与者共同作用

  1. 应用程序
  2. 事务监视器,TPM 协调 RM 的活动,以确保事务的 “要么全有要么全无” 属性。
  3. 资源管理器(RM,即我们要操作的对象,诸如数据库之类)。并且根据管理 RM 个数的不同,事务分为 local Transaction 和 global Transaction(管理两个及以上 RM)

与 Java 程序进行类比,事务在应用程序级别所提供的一些优势与 catch 和 finally 块在方法级别所提供的优势相同;它们使我们不用编写很多错误复原代码,即可执行可靠的错误复原。

JTA 接口

java 提供一个javax.transaction包,定义了事务(包括分布式事务)的一些接口。这些接口定义的组件间的通信用到了 XA 协议(javax.transaction.xa),X/Open XA 接口是双向的系统接口,在事务管理器(Transaction Manager)以及一个或多个资源管理器(Resource Manager)之间形成通信桥梁。

JTA 提供了以下三个接口

1.javax.transaction.UserTransaction,是面向开发人员的接口,能够编程地控制事务处理。

2.javax.transaction.TransactionManager,允许应用程序服务器来控制代表正在管理的应用程序的事务。

3.javax.transaction.xa.XAResource,面向提供商的实现接口,是一个基于 X/Open CAE Specification 的行业标准 XA 接口的 Java 映射。提供商在提供访问自己资源的驱动时,必须实现这样的接口。

来一个经典的转账例子,假设我们要操作账户 a 和账户 b,本地事务处理(在一个数据库中)流程:

Connection conn = null; 
try{
     //若设置为 true 则数据库将会把每一次数据更新认定为一个事务并自动提交
    conn.setAutoCommit(false);
     // 将 A 账户中的金额减少 500 
     // 将 B 账户中的金额增加 500 
     conn.commit();
}catch(){
     conn.rollback();
}

如果账户 a 和 b 不在一个数据库中,可以应用 UserTransaction 接口:

 UserTransaction userTx = null; 
 Connection connA = null; 
 Connection connB = null; 
 try{
    userTx.begin();
     // 将 A 账户中的金额减少 500 
     // 将 B 账户中的金额增加 500 
     userTx.commit();
}catch(){
     userTx.rollback();
}

此时,connection 就得支持 XAResource 接口了。XAResource 和 Transaction 如何关联呢?增强 exec 方法。Connection 的 exec 方法除了处理数据之外,还包含和 Transaction 关联的操作。

public void execute(String sql) { 
     // 对于每次数据库操作都检查此会话所在的数据库连接是否已经被加入到事务中
     associateWithTransactionIfNecessary(); 
     try{ 
            // 处理数据库操作的代码
     } catch(SQLException sqle){ 
         // 处理异常代码
     } catch(Exception ne){ 
         e.printStackTrace(); 
     } 
 } 
public void associateWithTransactionIfNecessary(){    
    // 获得 TransactionManager 
    TransactionManager tm = getTransactionManager(); 
    Transaction tx = tm.getTransaction();
    // 检查当前线程是否有分布式事务
    if(tx != null){ 
        // 在分布式事务内,通过 tx 对象判断当前数据连接是否已经被包含在事务中,
        // 如果不是那么将此连接加入到事务中
        Connection conn = this.getConnection(); 
        //tx.hasCurrentResource, xaConn.getDataSource() 不是标准的 JTA 
        // 接口方法,是为了实现分布式事务而增加的自定义方法
        if(!tx.hasCurrentResource(conn)){ 
            XAConnection xaConn = (XAConnection)conn; 
            XADataSource xaSource = xaConn.getDataSource(); 
            // 调用 Transaction 的接口方法,将数据库事务资源加入到当前事务中
            tx.enListResource(xaSource.getXAResource(), 1);
        } 
    } 
}

TransactionManager 本身并不承担实际的事务处理功能,它更多的是充当用户接口和实现接口之间的桥梁。此接口中的大部分事务方法与 UserTransaction 和 Transaction 相同。

Transaction 代表了一个物理意义上的事务,在开发人员调用UserTransaction.begin()方法时 TransactionManager 会创建一个 Transaction 事务对象(标志着事务的开始)并把此对象通过 ThreadLocal 关联到当前线程。同样UserTransaction.commit()会调用 TransactionManager.commit(),方法将从当前线程下取出事务对象 Transaction 并把此对象所代表的事务提交.其它方法诸如 “rollback(),getStatus()” 也是如此。

我以前的文章有说过,ThreadLocal 算是在线程的方法间传递参数的一种方式。此处,TransactionManager 和 Transaction 的关系也非常值得学习,Transaction 负责实现接口操作,至于这些接口方法什么时候被调用,包括它从线程上被 “拿上” 还是 “拿下”,这个活儿由 TransactionManager 干。然后用户就可以不分线程的使用 TransactionManager,也无需知道自己用的是哪个 Transaction。

重新审视下段代码,可以翻译为:

// 创建一个Transaction,挂到当前线程上
UserTransaction userTx = null; 
Connection connA = null; 
Connection connB = null; 
try{
    userTx.begin();
    // 将Connection对应的XAResource挂到当前线程对应的Transaction
    connA.exec("xxx")
    connB.exec("xxx")
    // 找到Transaction关联的XAResource,让它们都提交
    userTx.commit();
}catch(){
    // 找到Transaction关联的XAResource,让它们都回滚
    userTx.rollback();
}

通过上述内容,我们可以得到一个链,UserTransaction ==> TransactionManager ==> Transaction ==> 与其关联的 XAResource。begin,commit,rollback 操作就是这样一步步传导下来,其中 Threadlocal 扮演了关键角色。

TCC 源码分析

了解了上述 JTA 的最简单实现,熟悉 Spring 编程的人,应该立刻会想到,我们可以通过 Spring 将上述代码大大简化,开发人员只需实现业务代码即可。

https://github.com/changmingxie/tcc-transaction.git是我司大牛的一个两阶段提交协议实现,背景可以看 github 上的介绍,以下简称 TCC。

先从例子 tcc-transaction-dubbo-sample 看起

假设有两个服务,分别提供不同业务 model 数据库操作。要搁以前,直接增删改查数据库就好了。但现在,要先让我们的服务支持 XAResource 接口。与数据库事务作类比,要想支持分布式事务,数据库驱动提供商就要提供支持 XA 协议的 Connection,那么在本示例服务中,服务间的通信使用的是 dubbo,相关特性的支持不能内嵌到 dubbo,而 mysql 数据库驱动也不支持,因此需要我们在代码中显式提供每个数据库操作对应的 confirm 和 cancel 方法。

对应例子里的 CapitalTradeOrderService 接口(此处以 captital 服务为例),它定义了void record(TransactionContext transactionContext,CapitalTradeOrderDto tradeOrderDto);在 CapitalTradeOrderService 实现类 CapitalTradeOrderServiceImpl 里,除了 record 方法的实现外,附带了两个方法(通过这种方式,将自己变成支持 xa 协议的 XAResource)

@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord")
public void record(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
    System.out.println("capital try record called");
    CapitalAccount transferFromAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
    transferFromAccount.transferFrom(tradeOrderDto.getAmount());
    capitalAccountRepository.save(transferFromAccount);
}
public void confirmRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
    System.out.println("capital confirm record called");
    CapitalAccount transferToAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getOppositeUserId());
    transferToAccount.transferTo(tradeOrderDto.getAmount());
    capitalAccountRepository.save(transferToAccount);
}
public void cancelRecord(TransactionContext transactionContext, CapitalTradeOrderDto tradeOrderDto) {
    System.out.println("capital cancel record called");
    CapitalAccount capitalAccount = capitalAccountRepository.findByUserId(tradeOrderDto.getSelfUserId());
    capitalAccount.cancelTransfer(tradeOrderDto.getAmount());
    capitalAccountRepository.save(capitalAccount);
}

然后订单服务也有一个(负责操作 captital 和 redpacket 两个服务)

@Compensable(confirmMethod = "confirmMakePayment",cancelMethod = "cancelMakePayment")
public void makePayment(Order order, BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {
    System.out.println("order try make payment called");

    order.pay(redPacketPayAmount, capitalPayAmount);
    orderRepository.updateOrder(order);

    capitalTradeOrderService.record(null, buildCapitalTradeOrderDto(order));
    redPacketTradeOrderService.record(null, buildRedPacketTradeOrderDto(order));
}

从中,我们可以建立几个假设

  1. tcc 整个流程跟 Compensable 注解关系很大(compensable,英文意思是可补偿的)
  2. 所谓 tcc,即 try confirm cancel,那么 record 方法在这里应该就是 try 的作用
  3. 一个标有 Compensable 注解的方法,如果内部包括多个标有 Compensable 注解的方法,就会启用两阶段提交,Compensable 方法是可以嵌套的。

可以知道,TCC 框架只是对我们编写函数提出了要求,比如一个数据库操作要有 try,confirm,cancel 三个方法(到底是本地事务一个方法省事儿),但这个三个方法的调用时机不用我们关心,我们只需要用注解标识一下,其它的由框架负责。

分析源码

整个项目分为三个部分

  1. tcc-transaction-api,定义了三个基本的 model 类。
  2. tcc-transaction-core,实现两阶段提交的业务流程
  3. tcc-transaction-spring,完成将 tcc-transaction-core spring 化的一些封装

从 Compensable 注解看起,Compensable 在 tcc-transaction-core 中,org.mengyun.tcctransaction包下有几个 model,提供了这个项目的 “数据结构”,通过分析这个几个 model,发现其传导流程是 TransactionManager(将 transaction 从线程上拿上拿下)==》Transaction ==》participant(应该是 JTA 规范的 XAResource)==》Terminator ==》两个 InvocationContext(分别是 confirm 和 cancel(类似于数据库中的 commit 和 rollback))

tcc-transaction-core 还应用了 Aop 的一些理念(使用 aspectj,跟 spring aop 的实现还不太一样),从两个方向上对代码进行增强:

  1. 整个操作开始和结束,实现 “Transaction 的创建和 commit,rollback” 的透明化。参见 CompensableTransactionInterceptor

    private void rootMethodProceed(ProceedingJoinPoint pjp) throws Throwable {
        transactionConfigurator.getTransactionManager().begin();
        try {
            //这是留给用户实现的部分
            pjp.proceed();
        } catch (Throwable tryingException) {
            logger.error("compensable transaction trying failed.", tryingException);
        try {
            transactionConfigurator.getTransactionManager().rollback();
        } catch (Throwable rollbackException) {
            logger.error("compensable transaction rollback failed.", rollbackException);
            throw rollbackException;
        }
            throw tryingException;
        }
        transactionConfigurator.getTransactionManager().commit();
    }
    
  2. 数据操作的开始和结束,实现 “创建 Participant,并将 participant 关联到线程绑定的 Transaction” 的透明化。参见 ResourceCoordinatorInterceptor

     private Participant generateAndEnlistRootParticipant(ProceedingJoinPoint pjp) {
         // 获取拦截的方法信息
         MethodSignature signature = (MethodSignature) pjp.getSignature();
         Method method = signature.getMethod();
         Compensable compensable = getCompensable(pjp);
         String confirmMethodName = compensable.confirmMethod();
         String cancelMethodName = compensable.cancelMethod();
         // 获取本线程绑定的transaction
         Transaction transaction = transactionConfigurator.getTransactionManager().getCurrentTransaction();
         TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
         // 根据拦截的方法创建participant
         Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
         InvocationContext confirmInvocation = new InvocationContext(targetClass,
                 confirmMethodName,
                 method.getParameterTypes(), pjp.getArgs());
         InvocationContext cancelInvocation = new InvocationContext(targetClass,
                 cancelMethodName,
                 method.getParameterTypes(), pjp.getArgs());     
         Participant participant =
                 new Participant(
                         xid,
                         new Terminator(confirmInvocation, cancelInvocation));
         // 将participant关联到transaction
         transaction.enlistParticipant(participant);
         // 更新transaction存储
         TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
         transactionRepository.update(transaction);
         return participant;
     }
    

除此之外,TransactionRepository,提供 Transaction 的存储,可以选择不同的存储介质。

TransactionRecoveryJob 和 TransactionRecovery,周期性的处理未完成的 Transaction,根据以重试次数决定放弃或重试。

问题

这个就跟数据库分布式事务数据操作的过程很像了,不过方法要分为好几种类型,不同的类型处理不一样,比如 Root 类型,provider 类型,这个是为什么?

引用

Java 理论与实践: 理解 JTS —— 事务简介

JTA 深度历险 - 原理与实现


var page_date = '2016-05-21 00:00:00 +0000'; var gitment = new Gitment({ id: page_date, owner: 'topsli', repo: 'topsli.github.io', oauth: { client_id: 'ddbaadab5f5d810805c3', client_secret: '13b91b306680f11a55713382529b83259ef56546', }, }); gitment.render('gitmentContainer');
No Reply at the moment.
You need to Sign in before reply, if you don't have an account, please Sign up first.