spring 详细事务源码解析 —— 事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交?

原创
2020/01/10 09:19
阅读数 3.4K

零、引言

spring 事务怎么挂起的?传播等级怎么实现的?保存点、内嵌事务怎么做回滚和提交?如果你对这些感兴趣,或者想要了解它的运行机制,则可以参考此文章 ~

一、spring 事务大局纵览

上篇文章主要提到了 spring 如何去管理 mybatis 的事务,实际上就是借助 TransactionSynchronizationManager 来保存线程级别的连接对象,sqlSession对象,但我们没有过多提及 spring 本身的实现如何,我们先回顾一下几个主要的点。

  • 在创建 MapperProxy 时,spring 为其注入了一个 sqlSession 用于 sql执行,但是这个 sqlSession 是一个代理对象,叫做 sqlSessionTemplate,它会自动选择我们该使用哪个 sqlSession 去执行
  • 在执行时,spring 切面在执行事务之前,会创建一个叫做 TransactionInfo 的对象,此对象会根据事务传播等级来控制是否创建新连接,是否挂起上一个连接,将信息保存在 TransactionSynchronizationManager
  • 到了真正需要创建或者获取 sqlSession 时,spring 重写的 TransactionFactory 会优先去 TransactionSynchronizationManager 中拿连接对象。

上篇文章我们也零零碎碎讲到了 spring 事务管理的几个重要的接口,我们先从 TransactionInfo 这个类,它就是事务执行的上下文,TransactionInfo 的重要程度不言而喻。它包含了这么几个对象:

  • PlatformTransactionManager 事务进行几乎所有进行时操作的核心逻辑就在这里面,它有几个子类,我们主要还是讨论 DataSourceTransactionManager 这个子类,也就是对 JDBC 连接的管理。
  • TransactionDefition 事务在开启之前的一些配置信息,比如配置传播等级,隔离级别,超时控制等等。
  • TransactionStatus 事务的运行时信息基本都在这里面,比如连接信息,挂起的事务的信息(保存点),事务跑没跑完等等。
  • TransactionInfo - old ,每个 info 会包含它上个挂起事务的引用,可能为空。
  • Joinpointxxx 这个无所谓,debug 用的,包含了事务切面的信息的一个字符串。

还是从代码入手:也就是事务的执行纵览:

-- 代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction --
@Nullable
	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
		TransactionAttributeSource tas = getTransactionAttributeSource();
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
		final TransactionManager tm = determineTransactionManager(txAttr);
		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 创建或者拿到当前事务信息

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				retVal = invocation.proceedWithInvocation();// 执行切面
			}
			catch (Throwable ex) {
				// target invocation exception
				completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑
				throw ex;
			}
			finally {
				cleanupTransactionInfo(txInfo);// 拿回上一个事务
			}
			
			commitTransactionAfterReturning(txInfo);// 提交当前事务
			return retVal;
		}
	}

上述代码逻辑还是很清晰的

  • 先从 TransactionDefinition(TransactionAttribute) 开始,看看当前这个切面需不需要执行事务,
  • 如果能获取到,则通过其获取到合适的 PlatformTransactionManager,创建或者获取、封装到我们的 TransactionInfo
  • 执行切面,再根据切面情况选择提交或者回滚。

我们知道事务最终都会创建一个 TransactionInfo,这个对象创建的最后一步挺有意思:

-- 代码位于 org.springframework.transaction.interceptor.TransactionAspectSupport#prepareTransactionInfo --
-- 是方法 createTransactionIfNecessary 的最后一步 --

	protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
			@Nullable TransactionAttribute txAttr, String joinpointIdentification,
			@Nullable TransactionStatus status) {

		TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
		// We always bind the TransactionInfo to the thread, even if we didn't create
		// a new transaction here. This guarantees that the TransactionInfo stack
		// will be managed correctly even if no transaction was created by this aspect.
		txInfo.bindToThread();
		return txInfo;
	}

	private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
			new NamedThreadLocal<>("Current aspect-driven transaction");

	private void bindToThread() {
		// Expose current TransactionStatus, preserving any existing TransactionStatus
		// for restoration after this transaction is complete.
		this.oldTransactionInfo = transactionInfoHolder.get();
		transactionInfoHolder.set(this);
	}

我们发现它这里有一个 ThreadLocal,它会将之前已经保存在这里的 TransactionInfo 拿出来,放到刚才上面提到的 old 里面持有,而当前这个 TransactionInfo 对象则会扔进这个 ThreadLocal 里面,然后在执行完切面后把 old 放回去,形成了一个事务栈:

	protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
		if (txInfo != null) {
			txInfo.restoreThreadLocalStatus();
		}
	}

	private void restoreThreadLocalStatus() {
		// Use stack to restore old transaction TransactionInfo.
		// Will be null if none was set.
		transactionInfoHolder.set(this.oldTransactionInfo);
	}

二、TransactionStatus 源码解析

TransactionDefinition 基于配置,或者我们注解获取到的,其中又定义了该使用哪个 PlatformTransactionManager,这些相对来说都比较简单。而 TransactionStatus 事务运行时这个最重要的对象的创建与使用却是比较复杂的。

它的创建大体可以分为如下流程:

2.1、创建

它的创建是比较简单的,没有用工厂,直接创建即可。

	public DefaultTransactionStatus(
			@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
			boolean readOnly, boolean debug, @Nullable Object suspendedResources) {

		this.transaction = transaction;
		this.newTransaction = newTransaction;
		this.newSynchronization = newSynchronization;
		this.readOnly = readOnly;
		this.debug = debug;
		this.suspendedResources = suspendedResources;
	}

transaction 是里面一个比较重要的对象,它内置了连接对象,下面是 Transaction 对象的获取过程:

	DataSourceTransactionObject txObject = new DataSourceTransactionObject();
	txObject.setSavepointAllowed(isNestedTransactionAllowed());
	ConnectionHolder conHolder =
		(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
	txObject.setConnectionHolder(conHolder, false);
	return txObject;

我们可以看到核心的就是 ConnectionHolder,这个对象尤为重要,注意,这里并没有建立连接,而且也可能获取到空的 connectionHolder,连接的包裹对象 ConnectionHolder 可以控制两个事物到底是不是用的同一个连接(同一个真正的事务,一起rollback 一起commit 的那种)。

此对象存放在一个叫做 TransactionSynchronizationManager 的工具单例类里面,它内部持有非常多个 ThreadLocal 来存放事务信息。

2.2、连接初始化

上面说了没有真正建立连接,建立连接是在 doBegin() 的方法里做的:

	/**
	 * This implementation sets the isolation level but ignores the timeout.
	 */
	@Override
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;
		
		// 第一步,根据是否受事务管理,或者有没有 connectionHolder 对象去创建新连接,是否获取一个新连接
		if (!txObject.hasConnectionHolder() ||
				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
			Connection newCon = obtainDataSource().getConnection();
			if (logger.isDebugEnabled()) {
				logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
			}
			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
		}

		// 设置此事务已经受事务管理
		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
		con = txObject.getConnectionHolder().getConnection();

		// 设置一些 `TransactionDefinition` 过来的属性,比如隔离等级,传播等级,是否只读等等,并将其设置为可用。
		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
		txObject.setPreviousIsolationLevel(previousIsolationLevel);
		txObject.setReadOnly(definition.isReadOnly());

		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
		// so we don't want to do it unnecessarily (for example if we've explicitly
		// configured the connection pool to set it already).
		if (con.getAutoCommit()) {
			txObject.setMustRestoreAutoCommit(true);
			if (logger.isDebugEnabled()) {
				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
			}
			con.setAutoCommit(false);
		}

		prepareTransactionalConnection(con, definition);
		txObject.getConnectionHolder().setTransactionActive(true);

		int timeout = determineTimeout(definition);
		if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
			txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
		}
		
		//--------------------------------------------------------------------------------------

		// Bind the connection holder to the thread.
		if (txObject.isNewConnectionHolder()) {
			TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
		}
	}
	}

代码很长但是很简单,从上到下依次为:

  1. 如果没有 TransactionHolder 或者不属于其他事务管理,则创建一个新连接 obtainDataSource().getConnection();,并且把新创建的连接放入 connectionHoler
  2. 将其设置为已经受事务同步管理 setSynchronizedWithTransaction(true)
  3. 设置一些 TransactionDefinition 过来的属性,比如隔离等级,传播等级,是否只读等等,并将其设置为可用。
  4. 把它塞入或者塞回 TransactionSynchronizationManagerThreadLocal 里面,也就是将 connectionHolder 与当前线程绑定。

2.3、对 TransactionStatus 进行配置

方法如下,非常简单,但是十分重要:

	protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
		if (status.isNewSynchronization()) {
			TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
					definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
							definition.getIsolationLevel() : null);
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
			TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
			TransactionSynchronizationManager.initSynchronization();
		}
	}

我们发现又是 TransactionSynchronizationManager,这个事务信息保存对象不仅仅保存 connectionHolder,还保存它是否是一个事务,setActualTransactionActive,比如我们有些传播等级是有事务则使用事务,没有就不适用,spring 还是会给它准备一个 TransactionInfo,但是里面的实现基本上是空的。

后面是设置一些传播等级,名字,是否只读之类的属性,最后在 initSynchronization() 方法里面,创建了一个叫做 synchronizationslinkedHashSet<TransactionSynchronization>划重点,这个东西类似一个监听器,在触发事务的某些行为时会被调用,比如被挂起,被重新使用等等:

	/**
	 * Suspend this synchronization.
	 * Supposed to unbind resources from TransactionSynchronizationManager if managing any.
	 * @see TransactionSynchronizationManager#unbindResource
	 */
	default void suspend() {
	}

	/**
	 * Resume this synchronization.
	 * Supposed to rebind resources to TransactionSynchronizationManager if managing any.
	 * @see TransactionSynchronizationManager#bindResource
	 */
	default void resume() {
	}

	/**
	 * Flush the underlying session to the datastore, if applicable:
	 * for example, a Hibernate/JPA session.
	 * @see org.springframework.transaction.TransactionStatus#flush()
	 */
	@Override
	default void flush() {
	}
	
	/**
	 * Invoked before transaction commit (before "beforeCompletion").
	 * Can e.g. flush transactional O/R Mapping sessions to the database.
	 * <p>This callback does <i>not</i> mean that the transaction will actually be committed.
	 * A rollback decision can still occur after this method has been called. This callback
	 * is rather meant to perform work that's only relevant if a commit still has a chance
	 * to happen, such as flushing SQL statements to the database.
	 * <p>Note that exceptions will get propagated to the commit caller and cause a
	 * rollback of the transaction.
	 * @param readOnly whether the transaction is defined as read-only transaction
	 * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
	 * (note: do not throw TransactionException subclasses here!)
	 * @see #beforeCompletion
	 */
	default void beforeCommit(boolean readOnly) {
	}

	/**
	 * Invoked before transaction commit/rollback.
	 * Can perform resource cleanup <i>before</i> transaction completion.
	 * <p>This method will be invoked after {@code beforeCommit}, even when
	 * {@code beforeCommit} threw an exception. This callback allows for
	 * closing resources before transaction completion, for any outcome.
	 * @throws RuntimeException in case of errors; will be <b>logged but not propagated</b>
	 * (note: do not throw TransactionException subclasses here!)
	 * @see #beforeCommit
	 * @see #afterCompletion
	 */
	default void beforeCompletion() {
	}

三、TransactionStatus + TransactionSynchronizationManager 的使用以及事务传播等级

前面讲了 TransactionStatus 是怎么创建的,创建后发生了什么,创建后怎么复用连接,何时复用连接,实际上就是事务传播等级,我们一个一个看:

-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction --
	
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();

		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

我们刚才讲的是,获取不到 ConnectionHolder 的流程,那么如果获取到了,就是我们这里要讲的,传播等级。

3.1 PROPAGATION_NEVER

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			throw new IllegalTransactionStateException(
					"Existing transaction found for transaction marked with propagation 'never'");
		}

既然是 never,那就不可能有 connectionHolder,所以这里直接抛出异常。

3.2 PROPAGATION_NOT_SUPPORTED

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(
					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
		}

可以看到第一步,suspend(trx) 挂了了一个事务,并且在创建 TransactionStatus 时,没有放入 transaction 对象,也就是连接对象。

3.3 PROPAGATION_REQUIRES_NEW

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" +
						definition.getName() + "]");
			}
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException | Error beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
		}

这里很纯粹,我们知道 PROPAGATION_REQUIRES_NEW 就是不管怎么样,都创建新的事务,使用新的连接,所以这里直接挂起事务,并且 newTransactionStatus + doBegin +prepareSynchronization 三部曲,就是我们上个章节讲的 TransactionStatus 的初始化。

3.3 PROPAGATION_NESTED 与 SAVEPOINT保存点

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException(
						"Transaction manager does not allow nested transactions by default - " +
						"specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
			if (useSavepointForNestedTransaction()) {
				// Create savepoint within existing Spring-managed transaction,
				// through the SavepointManager API implemented by TransactionStatus.
				// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
				DefaultTransactionStatus status =
						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
				status.createAndHoldSavepoint();
				return status;
			}
			else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, null);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
		}

默认的传播级别,也就是有事务就用这个事务,没有就新建一个。注意这里有一个叫做 savePoint 保存点的东西:savePoint 有个计数器,保存在 ConnectionHolder 里面,如果需要创建保存点,就会通过它操作 Connection 去启用保存点,如果我们报了错,可以回滚到此保存点,比如这一小段代码 SAVEPOINT_1,里面有一个内嵌事务, SAVAPOINT_2,执行这个内嵌出错,我们可以回滚 SAVAPOINT_2,而 SAVEPOINT_1 不受影响。

没有使用 savePoint 则还是三部曲,但是这个三部曲,由于我们已经有一个现有的连接,所以会创建一个 TransactionStatus,但是他们的连接也就是 connectionHolder 使用的是同一个对象。

它的调用过程也是简单,我们的 PlatformTransactionManager 有一个叫做 rollback 的接口,它的实现有这么一段代码:

	if (status.hasSavepoint()) {
		if (status.isDebug()) {
			logger.debug("Rolling back transaction to savepoint");
		}
		status.rollbackToHeldSavepoint();
	}
	else if (status.isNewTransaction()) {
		if (status.isDebug()) {
			logger.debug("Initiating transaction rollback");
		}
		doRollback(status);
	}

3.4 suspend 和 resume?怎么实现的?

上面那些东西实际上都比较简单,就是判断是否获取新的连接,然后创建一个 TransactionInfo 对象,但是里面有一个稍微复杂点的东西,就是事务的挂起和恢复,是怎么做的?

我们先看挂起:

3.4.1 suspend 挂起事务
	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {
					suspendedResources = doSuspend(transaction);
				}
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException | Error ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		}
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

当一个事务被挂起,且处于同步状态(前面说的,调用过 prepare 最后一步 initSynchronization,搞了一个 linkedHashSet<TransactionSynchronization>),还划了重点的那个

  • 此时会先 doSuspendSynchronization(); ,里面很简单,就是把我们注册的那些 TransactionSynchronizationsuspend() 方法都跑一遍,并且把它们全部清除避免被跑两次以上,并且拿到这部分注册的 linkedHashSet<TransactionSynchronization> : suspendedSynchronizations 如下:
	private List<TransactionSynchronization> doSuspendSynchronization() {
		List<TransactionSynchronization> suspendedSynchronizations =
				TransactionSynchronizationManager.getSynchronizations();
		for (TransactionSynchronization synchronization : suspendedSynchronizations) {
			synchronization.suspend();
		}
		TransactionSynchronizationManager.clearSynchronization();
		return suspendedSynchronizations;
	}

紧接着调用 PlatfromTransactionManagerdoSuspend(),代码也是很简单,就是把刚才来来回回讲的那个 transactionHolder 移除掉,返回的是我们的事务对象,也就是包含了 connection 的那个

	protected Object doSuspend(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		txObject.setConnectionHolder(null);
		return TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}

后续的操作也很容易看懂,我们把它保存在 TransactionSynchronizationManager 的那些 ThreadLocal 全部拿到原值,并把现有的值清除掉,并且我们把上面说的那些乱七八糟的包括 suspendedSynchronizations ,以及 transaction 一并放到一个叫做 SuspendedResourcesHolder 的对象里面。

这样,这个事务就和当前线程没有半毛钱关系了,这些变量全部被保存在 SuspendedResourcesHolder 里面,这个保存着事务信息的对象会保存在我们新的 TransactionStatus 里面,那么问题来了,怎么恢复?实际上你都知道怎么挂起了,还不知道怎么恢复吗?

3.4.2 resume 恢复事务

当然是反过来操作!

resume() 的调用入口如下:

AbstractPlatformTransactionManager
getTransaction(TransactionDefinition)
resumeAfterBeginException(Object, SuspendedResourcesHolder, Throwable)
cleanupAfterCompletion(DefaultTransactionStatus)

很简单,分两种,第一种是当前事务报错了会恢复到上一个事务。第二种是当前事务执行完毕了,会调用恢复。代码很简单,如下:我就不啰嗦了,是上面的反操作。

-- 代码位于 org.springframework.transaction.support.AbstractPlatformTransactionManager#resume --

	protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
			throws TransactionException {

		if (resourcesHolder != null) {
			Object suspendedResources = resourcesHolder.suspendedResources;
			if (suspendedResources != null) {
				doResume(transaction, suspendedResources);
			}
			List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
			if (suspendedSynchronizations != null) {
				TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
				TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
				doResumeSynchronization(suspendedSynchronizations);
			}
		}
	}

3.5 TransactionStatus 总结

我们再回顾以及总结一下 TransactionStatus 这个 spring 事务运行时的重要对象,它的重要的方法以及成员变量如下所示,我们的内嵌事务,传播等级,事务是不是同一个事物,无非就是来操作这几个方法:

四、剩余流程(提交与回滚)解析

我们已经把最重要对象 TransactionStatus 的源码解析的七七八八了,剩下的事务控制实际上已经很简单了,获取到 TransactionInfo (主要是内部那个 TransactionStatus 以及对 TransactionSychronizationManager 的操作):

后续的操作我们看到,就是三部分:报错做什么、最终做什么、怎么提交。


		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 创建或者拿到当前事务信息

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				retVal = invocation.proceedWithInvocation();// 执行切面
			}
			catch (Throwable ex) {
				// target invocation exception
				completeTransactionAfterThrowing(txInfo, ex);// 回滚逻辑
				throw ex;
			}
			finally {
				cleanupTransactionInfo(txInfo);// 拿回上一个事务
			}
			
			commitTransactionAfterReturning(txInfo);// 提交当前事务
			return retVal;
		}

cleanupTransactionInfo() 我们已经讲过,就是 TransactionInfo 里有一个上一个事务 TransactionInfo 的引用,只是把它找回来,没什么太多的东西。

4.1 事务怎么做回滚

回滚的核心方法在这里:

			if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
				try {
					txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
				}
				catch (TransactionSystemException ex2) {
					logger.error("Application exception overridden by rollback exception", ex);
					ex2.initApplicationException(ex);
					throw ex2;
				}
				catch (RuntimeException | Error ex2) {
					logger.error("Application exception overridden by rollback exception", ex);
					throw ex2;
				}
			}

我们可以看到,回滚的时候,会先保证处于我们标识的异常里,默认是所有的 ErrorRuntimeException注意这点哦,如果你对非 RuntimeException 进行了抛出,是不会回滚的!或者你重写了 Throwable,也不会!

回滚的方法比较简单,但是又几个要注意的点:

				triggerBeforeCompletion(status);

				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Rolling back transaction to savepoint");
					}
					status.rollbackToHeldSavepoint();
				}
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction rollback");
					}
					doRollback(status);
				}
				else {
					// Participating in larger transaction
					if (status.hasTransaction()) {
						if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
							if (status.isDebug()) {
								logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
							}
							doSetRollbackOnly(status);
						}
						else {
							if (status.isDebug()) {
								logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
							}
						}
					}
					else {
						logger.debug("Should roll back transaction but cannot - no transaction available");
					}
					// Unexpected rollback only matters here if we're asked to fail early
					if (!isFailEarlyOnGlobalRollbackOnly()) {
						unexpectedRollback = false;
					}
				}
			}
			catch (RuntimeException | Error ex) {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				throw ex;
			}

			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

我们可以看到有很多的触发(trigger),这些触发实际上都是我们刚才讲的那个 Set<TransactionSynchronization> 这里就不多说了。

这里的回滚分为几种情况:

  1. 有保存点的,刚才讲过了,看漏的回去翻翻 3.3 PROPAGATION_NESTED 与 SAVEPOINT保存点 - 3.4.2 resume 恢复事务
  2. 崭新的事务!这是最简单的,就是调用 connection.rollback()
	protected void doRollback(DefaultTransactionStatus status) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
		if (status.isDebug()) {
			logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
		}
		try {
			con.rollback();
		}
	}
  1. 如果有事务,则通过 TransactionStatusConnectionHolder 的变量 rollbackOnly 设置为真

是的,到这里你发现除了保存点的回滚,这里没有做任何实质的回滚操作!它只是设置了一下 rollbackOnly,然后抛出异常,不过这个异常比如 mybatis 会捕获,并调用 sqlSession.rollback() 真正做回滚,不过这是 mybatis 的操作了。

4.2 事务怎么做提交

提交也和回滚一样的道理,入口如下:

	protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
		if (txInfo != null && txInfo.getTransactionStatus() != null) {
			if (logger.isTraceEnabled()) {
				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
			}
			txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
		}
	}

它的实现如下:

	public final void commit(TransactionStatus status) throws TransactionException {
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		if (defStatus.isLocalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Transactional code has requested rollback");
			}
			processRollback(defStatus, false);
			return;
		}

		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
			}
			processRollback(defStatus, true);
			return;
		}

		processCommit(defStatus);
	}

在这里,它会判断有没有其他地方给 ConnectionHolder 设置了 rollbackOnly,如果设置了,也会进行回滚操作,和 4.1 一样,最终抛出异常:

如果没有则正常调用 processCommit(),,这里不细讲,因为和 processRollback() 没什么太大区别,就是几个触发(trigger),然后如果有保存点,就释放一下保存点,最终来到 PlatformTransactionManager#docommit()

值得注意的是,只有崭新的事务( newTransaction = true )才会调用 docommit(),内嵌事务就不是崭新的事务,怎么判断崭新事务,就是那个创建了 ConnectionHolder,开启了连接的那一个事务,凡是拿了已有的 ConnectionHolder 进行操作的事务都不是崭新的事务:

	protected void doCommit(DefaultTransactionStatus status) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
		Connection con = txObject.getConnectionHolder().getConnection();
		if (status.isDebug()) {
			logger.debug("Committing JDBC transaction on Connection [" + con + "]");
		}
		try {
			con.commit();
		}
	}

代码一样很简单,这里就不啰嗦了。


参考源码:spring-boot 2.2.2,即 spring 5.2.2
展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部