文档章节

从AnnotationTransactionAspect开始rushSpring事务

Aruforce
 Aruforce
发布于 09/17 18:58
字数 2208
阅读 30
收藏 1

0. Spring 事务 with LTW

0.1. Spring 事务 With LTW的原因:

Pure Proxy-base mode有缺陷,其失效原因分析及使用方法及运行机制(LoadTimeWeaverBeanDefinitionParserAspectJWeavingEnabler )已经写过了,不再多写;

0.2 JDBC事务是Connection级别的东西:

基本操作模板: setAutoCommit(false)-> setTransactionIsolation(isolation_level)->statement.excute()... —>commit()-> onException conn.rollBack()

0.3 Spring事务基于JDBC事务

  1. Spring 需要保证当前线程内所有的事务方法:获取到的Connection对象是同一个;
  2. 在最外层的事务开始之前:根据事务的定义,设置connectionC.autoCommit(false),setTransactionIsolation 等等;
  3. 在事务方法之间做好savepoint并根据嵌入的事务定义做savepoint;
  4. 在最外层事务之后做好rollback及commit;

0.4 一个事务失效的代码样例

说明:

  1. Spring并未拦截对SqlConnection的获取;
  2. 当你需要Spring事务支持的时候需要从TransactionSynchronizationManager获取各种线程级别资源或者把资源bind到这个类上,用于方法调用间的参数非侵入性传递(尤其是声明式事务时)
@Transactional(rollbackFor = Exception.class)
@Override
public void test() throws Exception {
   /**
    *  从数据源获取connection ,不会回滚
    */
//  Connection connection = masterDataSource.getConnection();
//  int i = connection.createStatement().executeUpdate("insert into test (name)values ('test2')");
//  if (i ==1){
//            throw new Exception("test");
//  }
   /**
    *  从同步资源管理器获取connection,会回滚
    */
   ConnectionHolder connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(masterDataSource);
   int i = connectionHolder.getConnection().createStatement().executeUpdate("insert into test (name)values ('test2')");
   logger.info("after insert excute");
   if (i ==1){
      throw new Exception("test");
   }

   /**
    * mybatis dao 模板类,会回滚,也是从TransactionSynchronizationManager bind并获取的资源SQL session Factory,可以看下SqlSessionUtils.getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) 这段代码的实现,
    */
//   int i = testDao.deleteByPrimaryKey(3);
//   if (i == 1){
//       throw new Exception("test");
//   }
}

1. Spring是怎么拦截所有的事务方法的(以LTW AOP,@Transactional注解为例子)

1.1 aop.xml 配置文件

配置文件在spring-aspects.jar/META-INF/aop.xml,如下:

<?xml version="1.0"?>
<!--AspectJ load-time weaving config file to install common Spring aspects.-->
<aspectj>
    <!--<weaver options="-showWeaveInfo"/>-->
    <aspects>
        ...
        <aspect name="org.springframework.transaction.aspectj.AnnotationTransactionAspect"/>
        ...
    </aspects>
</aspectj>

1.2 切点:pointcut(AnnotationTransactionAspect)

/**匹配所有的所有被@Transactional注解标注的类及其子类的任何public method */
private pointcut executionOfAnyPublicMethodInAtTransactionalType() :execution(public * ((@Transactional *)+).*(..)) && within(@Transactional *);

/**
 * 匹配所有被@Transactional注解标注的方法
 */
private pointcut executionOfTransactionalMethod() :execution(@Transactional * *(..));

/**所有被Spring事务管理的其切面*/
protected pointcut transactionalMethodExecution(Object txObject) :(executionOfAnyPublicMethodInAtTransactionalType() || executionOfTransactionalMethod() ) && this(txObject);

1.2 增强:around advice(AbstractTransactionAspect)

@SuppressAjWarnings("adviceDidNotMatch")
Object around(final Object txObject): transactionalMethodExecution(txObject) {
    MethodSignature methodSignature = (MethodSignature) thisJoinPoint.getSignature();
    try {
        return invokeWithinTransaction(methodSignature.getMethod(), txObject.getClass(), new InvocationCallback() {
            public Object proceedWithInvocation() throws Throwable {
                return proceed(txObject); //继续执行被增强的方法,也就是我们自己写的业务逻辑
            }
        });
    }catch (RuntimeException ex) {
        throw ex;
    }catch (Error err) {
        throw err;
    }catch (Throwable thr) {
        Rethrower.rethrow(thr);
        throw new IllegalStateException("Should never get here", thr);
    }
}

结合上面这些代码,可以看到Spring将所有的事务方法执行全部拦截并实际转到了TransactionAspectSupport.invokeWithinTransaction方法内;

2. 事务的大体流程(TransactionAspectSupport)

2.1 Field

// 默认的transactionManager 缓存key
private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object();
// Named线程本地资源(resource not Inheritable from parent thread) 当前线程执行的事务
private static final ThreadLocal<TransactionInfo> transactionInfoHolder = new NamedThreadLocal<TransactionInfo>("Current aspect-driven transaction");
// 默认就是 transactionManager
private String transactionManagerBeanName;
// bean
private PlatformTransactionManager transactionManager;
// 事务管理的属性 就是个map<methodName,TransactionAttribute>:methodName就是一个事务方法的描述符号=全类名+方法名;TransactionAttribute 就是描述一个Spring事务的基本属性 隔离类型ISOLATION,传播类型PROPAGATION,什么情况要回滚,使用那个事务管理器etc;
private TransactionAttributeSource transactionAttributeSource;
// SpringBean Factory
private BeanFactory beanFactory;
// 事务管理器缓存(如果有多个的话)
private final ConcurrentMap<Object, PlatformTransactionManager> transactionManagerCache =new ConcurrentReferenceHashMap<Object, PlatformTransactionManager>(4);

2.2 整体事务流程:invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)throws Throwable {
    //获取当前方法事务方法的定义,参看默认的 AnnotationTransactionAttributeSource类就可以;
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    //根据方法的事务定义中指定的事务管理器name从beanFactory中获取事务管理器
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 生成事务的Id
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    //如果不是事务方法,事务管理器也不是servletServer提供的话,不论DataSource还是Hibernate都不是这一类
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        //创建事务
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            //执行事务方法
            retVal = invocation.proceedWithInvocation();
        }catch (Throwable ex) {
            // 抛出异常之后决定是否要回滚
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }finally {
            // 最终清理事务资源等等
            cleanupTransactionInfo(txInfo);
        }
        // 事务提交
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }else {
    // 这里可以省略了;一般在使用应用服务器(比如weblogic,WebSphere)提供的事务管理时才会进入这个分支,在非土豪单位很少能使用这些服务器,而且既然用Spring了最好遵从Spring轻量级这个约定,不要把自己的代码和应用环境绑定在一块
    ....
    }
}

3. 开始事务需要做的准备工作

3.1 创建事务流程:`createTransactionIfNecessary(tm, txAttr, joinpointIdentification)

protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        }else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
            }
        }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

3.1.1 由事务管理创建事务:AbstractPlatformTransactionManager.getTransaction(txAttr)

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    // 这个需要细看一下,以DataSourceTransactionManager为例子;
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    if (definition == null) {// 如果不是事务方法,生成一个默认的事务定义
        definition = new DefaultTransactionDefinition();
    }
    if (isExistingTransaction(transaction)) { // 当存在事务时,按照级别和传播等级的定义处理逻辑
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { // 事务超时时间,并没什么用,默认不设置超时时间的
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }
    // 如果当前线程不存在事务,根据TransactionAttr的开始执行业务逻辑
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {//只支持已存在的事务,不存在的话抛异常
        throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
    }else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        //对于PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED,那么创建或者加入当前事务
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //完成 DataBaseConnection及事务信息与线程的绑定
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }catch (RuntimeException ex) {
            resume(null, suspendedResources);
            throw ex;
        }catch (Error err) {
            resume(null, suspendedResources);
            throw err;
        }
    }else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}
3.1.1.1 创建新的事务定义对象 DataSourceTransactionManager.doGetTransaction()
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();//数据源型的事务属性对象,是一个链表结构
    txObject.setSavepointAllowed(isNestedTransactionAllowed());//true.允许设置sql save_point
    ConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    // 如果是新开启动事务的话,conHolder==null,TransactionSynchronizationManager的ThreadLocal resource在TransactionManger.dobegin()时才会赋值,而当当前事务方法是被其他事务方法调用时 这个conHolder调用方方法bind的ConnectionHolder
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}
3.1.1.2 开启新事务doBegin(transaction, definition);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//主要是绑定资源
doBegin(transaction, definition);
prepareSynchronization(status, definition);
protected void doBegin(Object transaction, TransactionDefinition definition) {
  DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  Connection con = null;
  try {
  //如果是新事务则从datasource获取一个connection
  if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    Connection newCon = this.dataSource.getConnection();
    if (logger.isDebugEnabled()) {
       logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    }
    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
   }
   txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
   //如果是嵌套事务的话,使用外部事物的connection
   con = txObject.getConnectionHolder().getConnection();
   // 对connection 设定 超时时间,是否只读,以及事务隔离级别
   Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
   txObject.setPreviousIsolationLevel(previousIsolationLevel);
   //如果是自动提交的话,设置为手动提交
   if (con.getAutoCommit()) {
      txObject.setMustRestoreAutoCommit(true);
      con.setAutoCommit(false);
   }
   //强制要求之制度的话,强制只读
   prepareTransactionalConnection(con, definition);
   txObject.getConnectionHolder().setTransactionActive(true);

   int timeout = determineTimeout(definition);
   if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
      txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    }
   // 把数据源和当前connection 绑定到线程上,ThreadLocal 系列的操作;这里就和3.1.1.1 创建新的事务定义对象对起来了
   if (txObject.isNewConnectionHolder()) {
      TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
   }
  }catch (Throwable ex) {
  if (txObject.isNewConnectionHolder()) {
      DataSourceUtils.releaseConnection(con, this.dataSource);
      txObject.setConnectionHolder(null, false);
   }
   throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
  }
}
3.1.1.3 如果已有事务isExistingTransaction(Object transaction);

这个就简写吧,主要是事务传播机制各种相应处理,是否加入老事务,savepoint的设置等等;还是要调用dobegin方法

3.2 事务信息绑定到当前线程 prepareTransactionInfo(PlatformTransactionManager tm,TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status)

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
//绑定到线程,这个是链表结构
txInfo.bindToThread();

4. 事务提交TransactionAspectSupport.commitTransactionAfterReturning(txInfo)

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
	if (txInfo != null && txInfo.hasTransaction()) {
		txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
	}
}

5. 异常产生

5.1 回滚或者提交

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
   if (txInfo != null && txInfo.hasTransaction()) {
      if (txInfo.transactionAttribute.rollbackOn(ex)) {
         try {
         // 事务回滚
            txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
         } catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
         }catch (RuntimeException ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            throw ex2;
         }catch (Error err) {
            logger.error("Application exception overridden by rollback error", ex);
            throw err;
         }
      }else {
         try {
            // 非回滚异常,commit
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
         } catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by commit exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
         }catch (RuntimeException ex2) {
            logger.error("Application exception overridden by commit exception", ex);
            throw ex2;
         }catch (Error err) {
            logger.error("Application exception overridden by commit error", ex);
            throw err;
         }
      }
   }
}

5.2 回滚或者提交造成异常 清理线程的事务资源资源cleanupTransactionInfo(txInfo)

protected void cleanupTransactionInfo(TransactionInfo txInfo) {
    if (txInfo != null) {
        txInfo.restoreThreadLocalStatus();
    }
}

© 著作权归作者所有

Aruforce

Aruforce

粉丝 3
博文 35
码字总数 49928
作品 0
朝阳
程序员
私信 提问
Spring事务用法示例与实现原理

关于事务,简单来说,就是为了保证数据完整性而存在的一种工具,其主要有四大特性:原子性,一致性,隔离性和持久性。对于Spring事务,其最终还是在数据库层面实现的,而Spring只是以一种比较...

爱宝贝丶
2018/08/28
8.6K
3
Oracle Prc C学习 五 事务

这个事务刚开始的时候我很不理解, 我以为是这个事务是一种结构,或者是一个package或者一个procedure, 后来我觉得我想明白点了,其实这个事务只是一件我要完成的事,这件事我不然就完成,要...

卜星星
2015/03/12
81
0
Spring嵌套事务解惑(重点)

http://www.iteye.com/topic/35907 PROPAGATION_REQUIRED -- 支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。 PROPAGATION_SUPPORTS -- 支持当前事务,如果当前没有事务...

码代码的小司机
2018/07/01
38
1
【58沈剑 架构师之路】InnoDB,快照读,在RR和RC下有何差异?

快照读(Snapshot Read) MySQL数据库,InnoDB存储引擎,为了提高并发,使用MVCC机制,在并发事务时,通过读取数据行的历史数据版本,不加锁,来提高并发的一种不加锁一致性读(Consistent Non...

张锦飞
2018/12/07
44
0
事务和SQL Server 2014中的内存OLTP

【IT168 技术】SQL Server内存OLTP中的事务是非常直接的。虽然我们并不会讨论一些可能的优化,但是它的基础设计模式是相当容易理解的,并且能够在其他的项目中重用。   SQL Server内存OLT...

InfoQ
2013/10/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

云栖干货回顾 | 更强大的实时数仓构建能力!分析型数据库PostgreSQL 6.0新特性解读

阿里云 AnalyticDB for PostgreSQL 为采用MPP架构的分布式集群数据库,完备支持SQL 2003,部分兼容Oracle语法,支持PL/SQL存储过程,触发器,支持标准数据库事务ACID。AnalyticDB PG通过行存...

大涛学弟
14分钟前
3
0
TL138/1808/6748-EasyEVM开发板硬件CPU、FLASH、RAM

TL138/1808/6748-EasyEVM是广州创龙基于SOM-TL138/SOM-TL1808/SOM-TL6748核心板开发的一款开发板。由于SOM-TL138/SOM-TL1808/SOM-TL6748核心板管脚兼容,所以此三个核心板共用同一个底板。开...

Tronlong创龙
18分钟前
2
0
开普勒平台开源版

https://github.com/kplcloud/kplcloud

perofu
22分钟前
3
0
昨天,这项阿里技术再获世界级科技大奖!

第六届世界互联网大会来了!千年水乡古镇乌镇又一次吸引了全世界的目光。 昨天,阿里云自研数据库POLARDB 在会上当选世界互联网领先科技成果。POLARDB解决了企业在云时代的数据库难题,帮助企...

阿里云官方博客
22分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部