# Spring 事务的传播方式实现
TransactionDefinition:用于描述隔离级别、超时时间、是否为只读事务和事务传播规则
TransactionStatus:代表一个事务的具体运行状态、以及还原点
PlatformTransactionManager:一个高层次的接口,包含3个方法。commit、rollback和getTransaction
```java
//#org.springframework.transaction.support.AbstractPlatformTransactionManager
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取事务对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果当前事务已存在
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
/* 出现事务传播行为
NEVER:抛出异常
NOT_SUPPORTED:则调用suspend(transaction)挂起当前事务,将被挂起的资源suspendedResources放入事务状态里。
suspend:
REQUIRES_NEW,则调用suspend(transaction)挂起当前事务,将事务对象transaction和被挂起的资源suspendedResources放入事务状态里。
然后调用doBegin(transaction, definition)方法去真正打开事务。最后调用prepareSynchronization(status, definition)方法准备事务同步
*/
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
//如果事务传播方式是MANDATORY、当前事务不存在抛异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
//如果事务传播方式是REQUIRED、REQUIRED_NEW、NESTED 创建新的事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
//SUPPORTS NOT_SUPPORTED NEVER 以非事务的方式运行
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
```
```java
/*
suspend挂起当前事务@ transaction
返回一个包含挂起的资源的对象
*/
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException{}
```
![image.png](https://i.loli.net/2021/03/21/JIMjXqkwyQzUu9g.png)
![image.png](https://i.loli.net/2021/03/22/lKGgHdwCMpu6hn7.png)
1、 可以看到事务对象DataSourceTransactionObject最主要的字段就是connectionHolder;
2、 获取它的方式是`TransactionSynchronizationManager.getResource(obtainDataSource();`
![image.png](https://i.loli.net/2021/03/22/ilA3TfYbvRUVZ5c.png)
```java
@Nullable
private static Object doGetResource(Object actualKey) {
/* JDK1.8
threadlocal<T> get()返回当前线程相关的 threadlocal 中变量,如果变量为 null,则返回 setInitialValue() 线程相关的初始值(null)
get()方法调用时 先通过getMap(Thread t)获取当前线程的ThreadLocalMap!=null;ThreadLocalMap.Entry e = map.getEntry(this); return e.value;
threadlocalMap是每一个线程实例的结构体,没有threadlocal变量时为null
*/
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
```
3、再来看`ConnectionHolder`的结构体
![image.png](https://i.loli.net/2021/03/22/6N1OhfrL8nZu7Pc.png)
我们主要关注首先它有一个引用计数(referenceCount),来指示它被引用次数,表示当前有多少个逻辑事务关联到它(在单数据源时并没有使用该字段);
二是它包含一个数据库链接(currentConnection),因此可以认为它就是表示一个物理事务。
4、回到`#org.springframework.jdbc.datasource.DataSourceTransactionManager`,Spring判断一个事务是否存在的方法如下:
```java
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
//判断当前事务有ConnectionHolder对象(关联了一个物理事务)并且当前事务活跃中
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
```
# 5、**suspend挂起事务的操作**
```java
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 挂起当前线程中所有同步的事务
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
// 要挂起的事务不为空 交由事务管理器挂起
if (transaction != null) {
// 将DataSourceTransactionObject关联的ConnectionHolder清空,
// 并且去线程的ThreadlocalMap删除<DataSource,ConnectionHolder>键值对 然后将ConnectionHolder返回
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 保存当前线程挂起的事务的信息
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
```
> 总的而言分以下几种情况:
1、当前事务同步是激活的,即物理事务绑定到了当前线程
调用doSuspend方法,将事务的ConnectionHolder设为null,获取当前线程的ThreadLocalMap,删除<datasource,connectionHolder>的键值对,然后将当前线程绑定的物理事务connectionHolder返回。然后保存当前事务的相关信息到SuspendResourceHolder实例中(这个类是被final修饰的),存储挂起的资源。
2、当前事务活动中 但同步未开启(物理事务活动中,但没绑定到线程)
只需保存事务状态 不必重置和事务相关的ThreadLocal变量
3、没有事务活动中
不做处理
# 6、commit提交事务的处理
```java
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 事务已完成 不允许commit/rollback 抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 事务状态是回滚,进入回滚逻辑
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
//如果事务没有被标记为回滚时提交,且事务状态是全局回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
//进入真正处理提交的逻辑
processCommit(defStatus);
}
```
```java
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
//准备工作 由具体事务管理器实现
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
//如果存在保存点 就释放掉(commit以后无用了)
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
/*只有当前的逻辑事务新打开了物理事务时才提交。
只是参与到已存在的物理事务中时不提交(因为这个物理事务还对应了其它没有执行完的逻辑事务)。*/
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
//提交事务(去调用sql原生操作了 connection.commit()类似这样)
doCommit(status);
}
//全局回滚失败
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
//如果是在完成前抛异常
if (!beforeCompletionInvoked) {
//触发完成前的回调方法
triggerBeforeCompletion(status);
}
//回滚处理 doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
//触发提交后的回调方法
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
//完成后清除事务相关状态
cleanupAfterCompletion(status);
}
}
```
# 7、rollback回滚事务的处理
```java
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
/*
这段比较简单,事务已完成则抛异常;否则进入回滚处理逻辑
*/
private void processRollback(DefaultTransactionStatus status) {
try {
try {
//触发完成前的回调操作
triggerBeforeCompletion(status);
//嵌套事务回滚处理
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
//回滚挂起在保存点的事务
status.rollbackToHeldSavepoint();
}
//当前事务中创建新事务的回滚操作
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//回滚处理,由具体的事务处理器实现
doRollback(status);
}
//如果在当前事务中没有新建事务
else if (status.hasTransaction()) {
//如果当前事务状态为本地回滚,或全局回滚失败
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug(
"Participating transaction failed - marking existing transaction as rollback-only");
}
//设置当前事务状态为回滚
doSetRollbackOnly(status);
}
//当前事务状态没有设置为本地回滚,且没有产生全局回滚失败,则
//由线程中的前一个事务来处理回滚,这个步骤任何处理
else {
if (status.isDebug()) {
logger.debug(
"Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
//如果当前线程没有事务
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
//对回滚操作过程中的运行时异常和错误的处理
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
//回滚操作完成后,触发回滚之后回调操作
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
//清除回滚之后事务状态信息
finally {
cleanupAfterCompletion(status);
}
}
```
# Spring管理事务流程图
![](https://cdn.zhezhi.press/springTransaction.png)
# 小结(个人理解)
Spring管理事务的过程中,commit、rollback操作仍然由JDBC和数据库去支持,例如`connection.commit()`;但Spring将事务`@Transaction`嵌入业务逻辑代码或者说将业务逻辑代码包装成事务的方式是通过**AOP**实现的。
另外,Spring的事务对象`DataSourceTransactionObject`最主要的字段就是`connectionHolder`,而一个`connectionHolder`又有`currentConnection`字段,因此可以认为它就是一个物理事务。
![image.png](https://i.loli.net/2021/03/22/RyWLXNAfjPYUFkO.png)
而`DataSourceTransactionManager`的主要属性包括`dataSource`(具体的数据库连接实体),主要方法包括`doGetTransaction`、`isExistingTransaction`、`doBegin`、`doSuspend`、`doCommit`、`doRollback`等,事务管理器通过`doGetTransaction`获取到一个DataSourceTransactionObject事务对象
![事务管理器](https://cdn.zhezhi.press/transactionManager.jpg)
Spring 从源码看事务——PlatformTransactionManager事务管理器