分布式事务系列(1.1)Spring事务管理器PlatformTransactionManager

原创
2015/05/15 07:28
阅读数 13.9W

#1 系列目录

#2 jdbc事务

##2.1 例子

public void save(User user) throws SQLException{
	Connection conn=jdbcDao.getConnection();
	conn.setAutoCommit(false);
	try {
		PreparedStatement ps=conn.prepareStatement("insert into user(name,age) value(?,?)");
		ps.setString(1,user.getName());
		ps.setInt(2,user.getAge());
		ps.execute();
		conn.commit();
	} catch (Exception e) {
		e.printStackTrace();
		conn.rollback();
	}finally{
		conn.close();
	}
}

##2.2 分析

  • 怎么使用事务

    将自动提交设置为false,即conn.setAutoCommit(false),然后手动来conn.commit()、或者conn.rollback()。

  • 一个重要意识

    conn.commit();conn.rollback()等这些属于事务代码,其他执行sql的代码属于业务代码

    只有事务代码和业务代码使用的是同一个Connection的时候,事务的回滚和提交才能正常执行,所以如果我们要实现事务代码和业务代码的分离,必须要保证他们使用的是同一个Connection。

  • 谁在执行事务

    我们可以看到,可以通过操作Connection连接来执行事务,这并不代表Connection具有事务功能,而是使用了数据库自身的事务功能,Connection仅仅是把一些命令如commit、rollback传递给数据库

##2.3 存在的问题

  • 1 业务代码都要嵌套在try catch事务模板代码中

  • 2 当存在多个类似save(User user)的业务逻辑时,没法保证他们的原子性

    login(user);
    save(user);
    

    这两个业务逻辑都是相似的代码,获取Connection连接,然后执行sql语句。没法保证它们的原子性,是因为它们使用的不是同一个连接,不在同一个事务内。

#3 Hibernate的事务

##3.1 例子

public void save(User user){
	Session session=hibernateDao.openSession();
	Transaction tx=null;
	try {
		tx=session.beginTransaction();  
        session.save(user);  
        tx.commit();  
	} catch (Exception e) {
		if(tx!=null){
			tx.rollback();
		}
	}finally{
		session.close();
	}
}

##3.2 分析

  • 事务功能和业务功能的分离

    jdbc事务中Connection负担了两方面的功能,事务功能和执行sql的功能。这里的Transaction是Hibernate自己定义的事务,Hibernate则把这两者的功能单独开来,将事务功能交给了Transaction,使得职责更加分工明确。

  • 事务的原理

    其实Session、Transaction内部会有一个相同的Connection,这样就保证了 业务代码和事务代码使用的是同一个Connection,Transaction事务的回滚都是依托内部的Connection来完成的,如下:

    • 事务的开始,设置自动提交为false

    事务的开始

    • 事务的提交,通过connection的commit方法来提交

    事务的提交

    事务的回滚等操作,不再列举

#4 Spring事务功能的总体接口设计

由于上述各家实现事务功能的方式各不相同,Spring进行了统一的抽象,形成了PlatformTransactionManager事务管理器接口,事务的提交、回滚等操作全部交给它来实现。Spring的事务体系也是在PlatformTransactionManager事务管理器接口上开展开来的,所以先来了解下PlatformTransactionManager事务管理器。

##4.1 事务功能的总体接口设计

先来看下三大接口

  • PlatformTransactionManager : 事务管理器

  • TransactionDefinition : 事务的一些基础信息,如超时时间、隔离级别、传播属性等

  • TransactionStatus : 事务的一些状态信息,如是否是一个新的事务、是否已被标记为回滚

看下PlatformTransactionManager如何来操作事务:

public interface PlatformTransactionManager {

	//根据事务定义TransactionDefinition,获取事务
	TransactionStatus getTransaction(TransactionDefinition definition);

	//提交事务
	void commit(TransactionStatus status);

	//回滚事务
	void rollback(TransactionStatus status);
}

##4.2 接口对应的实现

###4.2.1 事务定义接口TransactionDefinition 事务定义接口

  • 红线上方是一些常量定义(事务的隔离级别和事务的传播属性,具体不再说,网上一大堆)
  • 事务的定义包含:事务的隔离级别、事务的传播属性、超时时间设置、是否只读

要明白的地方:

事务的隔离级别是数据库本身的事务功能,然而事务的传播属性则是Spring自己为我们提供的功能,数据库事务没有事务的传播属性这一说法。

该接口的实现DefaultTransactionDefinition:默认的事务定义

public class DefaultTransactionDefinition implements TransactionDefinition, Serializable {
	private int propagationBehavior = PROPAGATION_REQUIRED;
	private int isolationLevel = ISOLATION_DEFAULT;
	private int timeout = TIMEOUT_DEFAULT;
	private boolean readOnly = false;
	//略
}
  • 事务的传播属性为PROPAGATION_REQUIRED,即当前没有事务的时候,创建一个,如果有则使用当前事务
  • 事务的隔离级别采用底层数据库默认的隔离级别
  • 超时时间采用底层数据库默认的超时时间
  • 是否只读为false

###4.2.2 事务接口定义TransactionStatus

先引出Connection连接中的保存点功能:

//创建一个保存点
conn.setSavepoint(name);
//回滚到某个保存点
conn.rollback(savepoint);
//释放某个保存点
conn.releaseSavepoint(savepoint);

TransactionStatus它继承了SavepointManager接口,SavepointManager是对事务中上述保存点功能的封装,如下:

public interface SavepointManager {
	Object createSavepoint() throws TransactionException;
	void rollbackToSavepoint(Object savepoint) throws TransactionException;
	void releaseSavepoint(Object savepoint) throws TransactionException;
}

Spring利用保存点功能实现了事务的嵌套功能。后面会详细说明。

TransactionStatus本身更多存储的是事务的一些状态信息:

事务状态信息

  • 是否是一个新的事物
  • 是否有保存点
  • 是否已被标记为回滚

常用的TransactionStatus接口实现为DefaultTransactionStatus:

默认的事务定义

目前jdbc事务是通过Connection来实现事务的,Hibernate是通过它自己定义的Transaction来实现的,所以各家的事务都不同,所以Spring只能以Object transaction的形式来表示各家的事务,事务的回滚和提交等操作都会最终委托给上述Object transaction来完成。

Object transaction的职责就是提交回滚事务,这个transaction的选择可能如下:

  • DataSourceTransactionObject
  • HibernateTransactionObject
  • JpaTransactionObject(之后再详细说)

详细信息分别如下:

  • 对于DataSourceTransactionObject:

    我们使用了dataSource来获取连接,要想实现事务功能,必然需要使用Connection,所以它中肯定有一个Connection来执行事务的操作。

    DataSourceTransactionObject中有一个ConnectionHolder,它封装了一个Connection。

  • 对于HibernateTransactionObject:

    我们使用了hibenrate,此时要想实现事务功能,必然需要通过hibernate自己定义的Transaction来实现。

    HibernateTransactionObject中含有一个SessionHolder,和上面的ConnectionHolder一样,它封装了一个Session,有了Session,我们就可以通过Session来产生一个Hibernate的Transaction,从而实现事务操作。

###4.2.3 事务管理器接口定义PlatformTransactionManager

类图关系如下:

默认的事务定义

重点来说下

  • AbstractPlatformTransactionManager
    • DataSourceTransactionManager
    • HibernateTransactionManager
    • JpaTransactionManager(之后详细再说)

这就需要来看看事务管理器的接口,上述的他们都是怎么实现的:

  • 1 第一个接口:TransactionStatus getTransaction(TransactionDefinition definition) 根据事务定义获取事务状态

    大体内容就是先获取上述说明的Object transaction,判断当前事务是否已存在,如果存在则进行事务的传播属性处理,后面详细说明,如果不存在new DefaultTransactionStatus,新创建一个事务,同时使用Object transaction开启事务。 分成了几个过程:

    • 1.1 获取Object transaction:

    不同的事务管理器获取不同的Object transaction

    • DataSourceTransactionManager就是获取上述的DataSourceTransactionObject

      从当前线程中获取绑定的ConnectionHolder,可能为null,如果为null,则会在下一个开启事务的过程中,从dataSource中获取一个Connection,封装成ConnectionHolder,然后再绑定到当前线程

      然后我们new 一个DataSourceTransactionObject了,具体过程如下:

    DataSourceTransactionManager获取一个事务

    • HibernateTransactionManager获取HibernateTransactionObject

      从当前线程中获取绑定的SessionHolder,可能为null,如果为null,则会在下一个开启事务的过程中从sessionFactory中获取一个session,然后封装成SessionHolder,然后再绑定到当前线程

      然后我们就可以new 一个HibernateTransactionObject了,具体过程如下:

HibernateTransactionManager获取一个事务

  • 1.2 构建DefaultTransactionStatus,使用Object transaction开启事务

    • DataSourceTransactionManager的DataSourceTransactionObject开启过程如下:

      首先判断之前的获取当前线程绑定的ConnectionHolder是否为null,如果为null,从dataSource中获取一个Connection,封装成ConnectionHolder,然后再绑定到当前线程

      因为开启了一个事务,则必须要关闭DataSourceTransactionObject中Connection的自动提交,代码如下(省略一些):

      	protected void doBegin(Object transaction, TransactionDefinition definition) {
      		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
      		Connection con = null;
      
      		//如果ConnectionHolder是否为null,从新获取
      		if (txObject.getConnectionHolder() == null ||
      				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
      			Connection newCon = this.dataSource.getConnection();
      			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      		}
      		con = txObject.getConnectionHolder().getConnection();
      
      		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      		txObject.setPreviousIsolationLevel(previousIsolationLevel);
      
      		//取消自动提交
      		if (con.getAutoCommit()) {
      			txObject.setMustRestoreAutoCommit(true);
      			if (logger.isDebugEnabled()) {
      				logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
      			}
      			con.setAutoCommit(false);
      		}
      		txObject.getConnectionHolder().setTransactionActive(true);
      
      
      		//如果是新增的ConnectionHolder,则绑定到当前线程
      		if (txObject.isNewConnectionHolder()) {
      			TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
      		}
      	}
      
  • HibernateTransactionManager的HibernateTransactionObject开启过程如下:

    也是同样的逻辑,如果SessionHolder为null,则从SessionFactory中获取一个Session,然后封装成SessionHolder,然后把这个SessionHolder绑定到当前线程

    		Session newSession = (entityInterceptor != null ?
    				getSessionFactory().withOptions().interceptor(entityInterceptor).openSession() :
    				getSessionFactory().openSession());
    		txObject.setSession(newSession);
    

    同时,使用上述session开启一个事务,把事务对象也保存到上述的SessionHolder中。

    		Transaction hibTx=session.beginTransaction();
    		txObject.getSessionHolder().setTransaction(hibTx);
    

    如果是新创建的SessionHolder,则绑定到当前线程

    		// Bind the session holder to the thread.
    		if (txObject.isNewSessionHolder()) {
    			TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder());
    		}
    
  • 2 第二个接口:void rollback(TransactionStatus status) 回滚事务

    回滚,则还是利用DefaultTransactionStatus内部的Object transaction来执行回滚操作

    • DataSourceTransactionManager就是使用DataSourceTransactionObject中的Connection来进行回滚操作

      protected void doRollback(DefaultTransactionStatus status) {
      	DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
      	Connection con = txObject.getConnectionHolder().getConnection();
      	try {
      		con.rollback();
      	}
      	catch (SQLException ex) {
      		throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
      	}
      }
      
    • HibernateTransactionManager就是使用HibernateTransactionObject中的SessionHolder中的Session创建的事务Transaction来进行回滚操作

      protected void doRollback(DefaultTransactionStatus status) {
      	HibernateTransactionObject txObject = (HibernateTransactionObject) status.getTransaction();
      	try {
      		txObject.getSessionHolder().getTransaction().rollback();
      	}
      }
      
  • 3 第三个接口: void commit(TransactionStatus status) 提交事务

    同理,DataSourceTransactionManager依托内部的Connection来完成提交操作

    HibernateTransactionManager依托内部的Transaction来完成提交操作

##4.3 事务的传播属性解析

可以参考这篇文章的案例说明Spring事务的传播行为和隔离级别,下面重点源码分析下Spring的事务传播属性:

对于事务的传播属性的代码如下:

事务传播入口

在获取Object transaction之后,先进行判断,是否是已存在的事务。因为这个Object transaction的获取过程就是直接从线程绑定的获取的,可能当前线程已经存在事务,具体判断如下:

  • DataSourceTransactionManager

    protected boolean isExistingTransaction(Object transaction) {
    	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    	return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive());
    }
    

    就是依据和当前线程绑定的ConnectionHolder中是否已存在事务

  • HibernateTransactionManager

    public boolean hasSpringManagedTransaction() {
    	return (this.sessionHolder != null && this.sessionHolder.getTransaction() != null);
    }
    

    也是依据和当前线程绑定的SessionHolder是否已存在事务

如果是已存在事务:则需要对事务的传播属性进行处理,如下即上述截图中的的handleExistingTransaction方法:

  • 1 PROPAGATION_NEVER:不允许存在事务,如果存在抛出异常

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    	throw new IllegalTransactionStateException(
    			"Existing transaction found for transaction marked with propagation 'never'");
    }
    
  • 2 PROPAGATION_NOT_SUPPORTED:不支持事务,如果存在事务,则需将事务挂起,保存起来,当执行完成之后,需要将挂起的事务继续恢复

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    	if (debugEnabled) {
    		logger.debug("Suspending current transaction");
    	}
    	Object suspendedResources = suspend(transaction);
    	boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    	return prepareTransactionStatus(
    			definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    

    挂起之后,产生一个Object transaction=null的事务,即不执行事务代码,同时把挂起的资源信息传递给新创建的事务,当这个事务执行完成之后,再把挂起的资源恢复过来

    • 2.1 对于DataSourceTransactionManager来说,事务的挂起,就是把当前线程关联的ConnectionHolder解除绑定:

      protected Object doSuspend(Object transaction) {
      	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
      	txObject.setConnectionHolder(null);
      	ConnectionHolder conHolder = (ConnectionHolder)
      			TransactionSynchronizationManager.unbindResource(this.dataSource);
      	return conHolder;
      }
      

      同理事务的恢复就是把上述ConnectionHolder再重新绑定到当前线程,继续执行该事务

    • 2.2 对于HibernateTransactionManager来说,事务的挂起,就是把当前线程关联的SessionHolder解除绑定

      protected Object doSuspend(Object transaction) {
      	HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;
      	txObject.setSessionHolder(null);
      	SessionHolder sessionHolder =
      			(SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory());
      	txObject.setConnectionHolder(null);
      	ConnectionHolder connectionHolder = null;
      	if (getDataSource() != null) {
      		connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
      	}
      	return new SuspendedResourcesHolder(sessionHolder, connectionHolder);
      }
      

      同理事务的恢复就是把上述SessionHolder再重新绑定到当前线程,继续执行该事务

  • 3 PROPAGATION_REQUIRES_NEW:开启一个新的事务,如果当前存在事务则把当前事务挂起来

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    	SuspendedResourcesHolder suspendedResources = suspend(transaction);
    	try {
    		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    		DefaultTransactionStatus status = newTransactionStatus(
    				definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    		doBegin(transaction, definition);
    		prepareSynchronization(status, definition);
    		return status;
    	}
    }
    

    可以看到,创建新的事务,就会调用doBegin(transaction, definition);方法,将事务开启。

  • 4 PROPAGATION_NESTED : 原理很多人已详细说明,可以参考简单理解Spring中的PROPAGATION_NESTED,源码证实有待继续研究

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    	if (useSavepointForNestedTransaction()) {
    		DefaultTransactionStatus status =
    				prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    		status.createAndHoldSavepoint();
    		return status;
    	}
    }
    

    这里使用了Object transaction来创建了SavePoint,仍旧使用原事务

  • 5 PROPAGATION_SUPPORTS 和 PROPAGATION_REQUIRED : 如果当前存在事务,则仍旧使用该事物

至此事务管理器接口就简略说完了,还有很多细节的东西,需要各位再去仔细研究。 #5 结束语

了解了PlatformTransactionManager事务管理器,下面就要开展Spring的编程式事务、声明式事务,所以下一篇文章内容如下:

  • TransactionTemplate可以实现编程式事务
  • Spring使用AOP来实现声明式事务

欢迎关注微信公众号:乒乓狂魔

微信公众号

展开阅读全文
加载中
点击加入讨论🔥(21) 发布并加入讨论🔥
21 评论
216 收藏
35
分享
返回顶部
顶部