Spring 从源码看事务——PlatformTransactionManager事务管理器

Spring 事务的传播方式实现

TransactionDefinition:用于描述隔离级别、超时时间、是否为只读事务和事务传播规则
TransactionStatus:代表一个事务的具体运行状态、以及还原点
PlatformTransactionManager:一个高层次的接口,包含3个方法。commit、rollback和getTransaction

	//#org.springframework.transaction.support.AbstractPlatformTransactionManager
	@Override
	public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
			throws TransactionException {

		// Use defaults if no transaction definition given.
		TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

		// 获取事务对象
		Object transaction = doGetTransaction();
		boolean debugEnabled = logger.isDebugEnabled();

		// 如果当前事务已存在
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.


			/* 出现事务传播行为
			NEVER:抛出异常
			NOT_SUPPORTED:则调用suspend(transaction)挂起当前事务,将被挂起的资源suspendedResources放入事务状态里。
			suspend:
			REQUIRES_NEW,则调用suspend(transaction)挂起当前事务,将事务对象transaction和被挂起的资源suspendedResources放入事务状态里。
			然后调用doBegin(transaction, definition)方法去真正打开事务。最后调用prepareSynchronization(status, definition)方法准备事务同步
			 */
			return handleExistingTransaction(def, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
		}

		// No existing transaction found -> check propagation behavior to find out how to proceed.
		//如果事务传播方式是MANDATORY、当前事务不存在抛异常
		if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		//如果事务传播方式是REQUIRED、REQUIRED_NEW、NESTED 创建新的事务
		else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
			}
			try {
				return startTransaction(def, transaction, debugEnabled, suspendedResources);
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		//SUPPORTS NOT_SUPPORTED NEVER 以非事务的方式运行
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + def);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
		}
	}

/*
suspend挂起当前事务@ transaction
返回一个包含挂起的资源的对象
*/
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException{}

image.png
image.png

1、 可以看到事务对象DataSourceTransactionObject最主要的字段就是connectionHolder;
2、 获取它的方式是TransactionSynchronizationManager.getResource(obtainDataSource();
image.png

	@Nullable
	private static Object doGetResource(Object actualKey) {
		/*  JDK1.8
			threadlocal<T> get()返回当前线程相关的 threadlocal 中变量,如果变量为 null,则返回 setInitialValue() 线程相关的初始值(null)
			get()方法调用时 先通过getMap(Thread t)获取当前线程的ThreadLocalMap!=null;ThreadLocalMap.Entry e = map.getEntry(this); return e.value;
			threadlocalMap是每一个线程实例的结构体,没有threadlocal变量时为null

		 */
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}

3、再来看ConnectionHolder的结构体
image.png
我们主要关注首先它有一个引用计数(referenceCount),来指示它被引用次数,表示当前有多少个逻辑事务关联到它(在单数据源时并没有使用该字段);
二是它包含一个数据库链接(currentConnection),因此可以认为它就是表示一个物理事务。

4、回到#org.springframework.jdbc.datasource.DataSourceTransactionManager,Spring判断一个事务是否存在的方法如下:

	@Override
	protected boolean isExistingTransaction(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
//判断当前事务有ConnectionHolder对象(关联了一个物理事务)并且当前事务活跃中
		return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
	}

5、suspend挂起事务的操作

	@Nullable
	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			// 挂起当前线程中所有同步的事务
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				// 要挂起的事务不为空 交由事务管理器挂起
				if (transaction != null) {
					// 将DataSourceTransactionObject关联的ConnectionHolder清空,
					// 并且去线程的ThreadlocalMap删除<DataSource,ConnectionHolder>键值对 然后将ConnectionHolder返回
					suspendedResources = doSuspend(transaction);
				}
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);
				// 保存当前线程挂起的事务的信息
				return new SuspendedResourcesHolder(
						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
			}
			catch (RuntimeException | Error ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		}
		else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		}
		else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

总的而言分以下几种情况:
1、当前事务同步是激活的,即物理事务绑定到了当前线程
调用doSuspend方法,将事务的ConnectionHolder设为null,获取当前线程的ThreadLocalMap,删除<datasource,connectionHolder>的键值对,然后将当前线程绑定的物理事务connectionHolder返回。然后保存当前事务的相关信息到SuspendResourceHolder实例中(这个类是被final修饰的),存储挂起的资源。
2、当前事务活动中 但同步未开启(物理事务活动中,但没绑定到线程)
只需保存事务状态 不必重置和事务相关的ThreadLocal变量
3、没有事务活动中
不做处理

6、commit提交事务的处理

	@Override
	public final void commit(TransactionStatus status) throws TransactionException {
		//  事务已完成 不允许commit/rollback 抛出异常
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		// 事务状态是回滚,进入回滚逻辑
		if (defStatus.isLocalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Transactional code has requested rollback");
			}
			processRollback(defStatus, false);
			return;
		}

		//如果事务没有被标记为回滚时提交,且事务状态是全局回滚 
		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
			if (defStatus.isDebug()) {
				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
			}
			processRollback(defStatus, true);
			return;
		}
		//进入真正处理提交的逻辑
		processCommit(defStatus);
	}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;

			try {
				//准备工作 由具体事务管理器实现
				boolean unexpectedRollback = false;
				prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				
				//如果存在保存点 就释放掉(commit以后无用了)
				if (status.hasSavepoint()) {
					if (status.isDebug()) {
						logger.debug("Releasing transaction savepoint");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					status.releaseHeldSavepoint();
				}
				/*只有当前的逻辑事务新打开了物理事务时才提交。
只是参与到已存在的物理事务中时不提交(因为这个物理事务还对应了其它没有执行完的逻辑事务)。*/
				else if (status.isNewTransaction()) {
					if (status.isDebug()) {
						logger.debug("Initiating transaction commit");
					}
					unexpectedRollback = status.isGlobalRollbackOnly();
					//提交事务(去调用sql原生操作了 connection.commit()类似这样)
					doCommit(status);
				}
				//全局回滚失败
				else if (isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = status.isGlobalRollbackOnly();
				}

				// Throw UnexpectedRollbackException if we have a global rollback-only
				// marker but still didn't get a corresponding exception from commit.
				if (unexpectedRollback) {
					throw new UnexpectedRollbackException(
							"Transaction silently rolled back because it has been marked as rollback-only");
				}
			}
			catch (UnexpectedRollbackException ex) {
				// can only be caused by doCommit
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
				throw ex;
			}
			catch (TransactionException ex) {
				// can only be caused by doCommit
				if (isRollbackOnCommitFailure()) {
					doRollbackOnCommitException(status, ex);
				}
				else {
					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
				}
				throw ex;
			}
			catch (RuntimeException | Error ex) {
				//如果是在完成前抛异常
				if (!beforeCompletionInvoked) {
				//触发完成前的回调方法				
					triggerBeforeCompletion(status);
				}
//回滚处理				doRollbackOnCommitException(status, ex);
				throw ex;
			}

			// Trigger afterCommit callbacks, with an exception thrown there
			// propagated to callers but the transaction still considered as committed.
			try {
//触发提交后的回调方法
				triggerAfterCommit(status);
			}
			finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}

		}
		finally {
//完成后清除事务相关状态
			cleanupAfterCompletion(status);
		}
	}

7、rollback回滚事务的处理

	@Override
	public final void rollback(TransactionStatus status) throws TransactionException {
		if (status.isCompleted()) {
			throw new IllegalTransactionStateException(
					"Transaction is already completed - do not call commit or rollback more than once per transaction");
		}

		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
		processRollback(defStatus, false);
	}
/*
这段比较简单,事务已完成则抛异常;否则进入回滚处理逻辑
*/

private void processRollback(DefaultTransactionStatus status) {  
        try {  
            try {  
                //触发完成前的回调操作  
                triggerBeforeCompletion(status);  
                //嵌套事务回滚处理  
                if (status.hasSavepoint()) {  
                    if (status.isDebug()) {  
                        logger.debug("Rolling back transaction to savepoint");  
                    }  
                    //回滚挂起在保存点的事务  
                    status.rollbackToHeldSavepoint();  
                }  
                //当前事务中创建新事务的回滚操作  
                else if (status.isNewTransaction()) {  
                    if (status.isDebug()) {  
                        logger.debug("Initiating transaction rollback");  
                    }  
                    //回滚处理,由具体的事务处理器实现  
                    doRollback(status);  
                }  
                //如果在当前事务中没有新建事务  
                else if (status.hasTransaction()) {  
                    //如果当前事务状态为本地回滚,或全局回滚失败  
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {  
                        if (status.isDebug()) {  
                            logger.debug(  
                                    "Participating transaction failed - marking existing transaction as rollback-only");  
                        }  
                        //设置当前事务状态为回滚  
                        doSetRollbackOnly(status);  
                    }  
                    //当前事务状态没有设置为本地回滚,且没有产生全局回滚失败,则  
//由线程中的前一个事务来处理回滚,这个步骤任何处理  
                    else {  
                        if (status.isDebug()) {  
                            logger.debug(  
                                    "Participating transaction failed - letting transaction originator decide on rollback");  
                        }  
                    }  
                }  
                //如果当前线程没有事务  
                else {  
                    logger.debug("Should roll back transaction but cannot - no transaction available");  
                }  
            }  
            //对回滚操作过程中的运行时异常和错误的处理  
            catch (RuntimeException ex) {  
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);  
                throw ex;  
            }  
            catch (Error err) {  
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);  
                throw err;  
            }  
            //回滚操作完成后,触发回滚之后回调操作  
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);  
        }  
        //清除回滚之后事务状态信息  
        finally {  
            cleanupAfterCompletion(status);  
        }  
    }  
 

Spring管理事务流程图

小结(个人理解)

Spring管理事务的过程中,commit、rollback操作仍然由JDBC和数据库去支持,例如connection.commit();但Spring将事务@Transaction嵌入业务逻辑代码或者说将业务逻辑代码包装成事务的方式是通过AOP实现的。
另外,Spring的事务对象DataSourceTransactionObject最主要的字段就是connectionHolder,而一个connectionHolder又有currentConnection字段,因此可以认为它就是一个物理事务。
image.png
DataSourceTransactionManager的主要属性包括dataSource(具体的数据库连接实体),主要方法包括doGetTransactionisExistingTransactiondoBegindoSuspenddoCommitdoRollback等,事务管理器通过doGetTransaction获取到一个DataSourceTransactionObject事务对象
事务管理器