黄东旭解析 TiDB 的核心优势
398
2024-03-18
本篇文章将会对锁冲突场景下,常用的解决锁冲突的接口,进行个人对代码的理解与解析,希望对大家理解 TIKV
分布式事务有所帮助
这个接口的作用比较明显,主要是事务中,遇到锁冲突 (一般是悲观事务过程中加悲观锁,或者 prewrite
过程中加的锁),从锁信息中获取到其 Primary KEY
,进而通过这个接口来看 primary
key
的当前事务状态。事务状态可能是已经提交,已经回滚,或者还在事务中。
特别的,如果发现 Primary KEY
的锁已经过期了,CheckTxnStatus
还会主动将其进行保护模式的回滚。
假如目前有两个并发事务,t1
和 t2
t1
在事务过程中,发现了 t2
的锁,因此 t1
调用了 CheckTxnStatus
接口来查看 t2
当前的状态
primary_key
: 需要查看的 t2
主键 KEY
lock_ts
: t2
的 start_ts
caller_start_ts
: t1
的 start_ts
current_ts
: 当前的 ts
rollback_if_not_exist
: 如果没有发现任何提交记录或者回滚记录的时候,是直接回滚还是返回错误
force_sync_commit
: async_commit
的场景下,是否强制推进 async_commit
进程,否则返回 uncommitted
resolving_pessimistic_lock
:false
代表本意是想解析 prewrite lock
,true
代表本意是想解析悲观锁。
verify_is_primary
: 验证主键上面的锁是主键锁 ( issue 42937 ),目前默认开启该校验功能
和其他接口一样,首先需要获取 Primary
的锁信息
如果找到的锁是 符合预期 的 t2
的锁,那么调用 check_txn_status_lock_exists
特别地,如果悲观锁信息显示该 lock
是通过公平锁功能写入的,那么这个 lock
需要进一步进行检查验证,防止 issue 43540 ,进一步查看该 lock
对应的事务是已经提交或者回滚,防止其是 stale
的 lock
。如果确实没有任何提交记录或者回滚记录,那么可以才可以认为该悲观锁是可用的有效的。否则的话,直接清楚该悲观锁,返回事务状态即可。
如果悲观锁已经过期
如果悲观锁未过期,那么更新 lock
的 min_commit_ts
,并且返回 TxnStatus::uncommitted
如果预期检查的 lock
就是悲观锁,那么只需要清理悲观锁,返回 Ok(TxnStatus::PessimisticRollBack)
即可,无需回滚
如果预期检查的 lock
是 prewrite lock
,我们就需要清理悲观锁的同时,还需要留下回滚的记录 (非保护模式下,笔者目前不太了解为何还是非保护模式)
如果已经过期,那么直接回滚,返回 TxnStatus::TtlExpire
。(这里的回滚好像是非保护模式的?按理说应该对 primary lock
进行保护模式的回滚,笔者比较疑惑)
如果还未过期,那么更新 lock
的 min_commit_ts
,并且返回 TxnStatus::uncommitted
Corner Case
:但是存在一个特殊情况 (lock
是悲观锁 && resolving_pessimistic_lock
是 false
)。就是说本来想要解析的是 prewrite lock
,结果发现是悲观锁,而且锁的 primary
主键 key
还对应不上,这种场景下会网开一面并不会报错,而是会清理悲观锁,并且使用 check_txn_status_missing_lock
来进行进一步查看事务状态。这种场景的出现可能是因为悲观事务的 acquire_pessimistic_lock
接口被 stale
调用导致的
首先需要校验这个 lock
的合法性:如果 verify_is_primary
参数是 true
,结果发现这个锁信息中的 primary key
和请求参数的 primary
对应不上,那么需要返回错误 PrimaryMismatch
,这种情况可能是 primary key
被替换了。
如果是 use_async_commit
类型的 lock
,非强制模式下 ( force_sync_commit
为 false
),直接返回 Ok(TxnStatus::uncommitted)
后续可能会进行重试。否则的话,继续执行
如果 lock
是 prewrite lock
的话,是符合预期的进一步判断 lock
的过期时间
如果 lock
是悲观锁的话,需要使用 check_txn_status_from_pessimistic_primary_lock
进一步处理
如果没有找到锁,或者找到的锁是不是符合预期的 t2
的锁,属于非预期场景,那么调用 check_txn_status_missing_lock
check_txn_status_missing_lock
这个函数我们应该很熟悉了,这个函数在 rollback
、cleanup
函数中也会被调用,但是由于 MissingLockAction
的不同,逻辑稍微有些变化:
如果发现有本事务的 OverlappedRollback
的记录或者回滚记录 (SingleRecord::Rollback
),说明已经回滚完成,直接返回 OK
如果发现有本事务提交记录的话,返回 ErrorInner::Committed
如果没有找到任何本事务 write
记录的话,属于非预期场景
调用 mark_rollback_on_mismatching_lock
在这个 LOCK
上面添加回滚 LockTS
标记,这样这个 lock
所涉及的事务在提交后,如果发现自己的 commitTS
和 LockTS
重叠的话,需要设置一下 overlap
标记
调用 make_rollback
写入保护模式的 rollback
记录,确保这个回滚记录不会被删除
删除 collapse
以前的非保护rollback
记录
如果 rollback_if_not_exist
为 false
,那么直接返回 ErrorInner::TxnNotFound
如果 resolving_pessimistic_lock
参数为 true
的话,就是说目标是解析悲观锁,结果并没有发现该锁,这时候会返回 Ok(TxnStatus::LockNotExistDoNothing)
如果 rollback_if_not_exist
为 true
,那么需要进行保护模式的回滚操作:
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> { ... let (txn_status, released) = match reader.load_lock(&self.primary_key)? { Some(lock) if lock.ts == self.lock_ts => check_txn_status_lock_exists( &mut txn, &mut reader, self.primary_key, lock, self.current_ts, self.caller_start_ts, self.force_sync_commit, self.resolving_pessimistic_lock, self.verify_is_primary, self.rollback_if_not_exist, )?, l => ( check_txn_status_missing_lock( &mut txn, &mut reader, self.primary_key, l, MissingLockAction::rollback(self.rollback_if_not_exist), self.resolving_pessimistic_lock, )?, None, ), }; ... Ok(WriteResult { ... }) }pub fn check_txn_status_lock_exists( txn: &mut MvccTxn, reader: &mut SnapshotReader<impl Snapshot>, primary_key: Key, mut lock: Lock, current_ts: TimeStamp, caller_start_ts: TimeStamp, force_sync_commit: bool, resolving_pessimistic_lock: bool, verify_is_primary: bool, rollback_if_not_exist: bool, ) -> Result<(TxnStatus, Option<ReleasedLock>)> { if verify_is_primary && !primary_key.is_encoded_from(&lock.primary) { return match (resolving_pessimistic_lock, lock.is_pessimistic_lock()) { (false, true) => { ... let txn_status = check_txn_status_missing_lock( ... MissingLockAction::rollback(rollback_if_not_exist), resolving_pessimistic_lock, )?; Ok((txn_status, released)) } _ => { Err( ErrorInner::PrimaryMismatch... ) } }; } // Never rollback or push forward min_commit_ts in check_txn_status if it's // using async commit. Rollback of async-commit locks are done during // ResolveLock. if lock.use_async_commit { if force_sync_commit { ... } else { return Ok((TxnStatus::uncommitted(lock, false), None)); } } let is_pessimistic_txn = !lock.for_update_ts.is_zero(); if lock.is_pessimistic_lock() { let check_result = check_txn_status_from_pessimistic_primary_lock( ... resolving_pessimistic_lock, )?; ... } else if lock.ts.physical() + lock.ttl < current_ts.physical() { let released = rollback_lock(txn, reader, primary_key, &lock, is_pessimistic_txn, true)?; return Ok((TxnStatus::TtlExpire, released)); } if !lock.min_commit_ts.is_zero() && !caller_start_ts.is_max() // Push forward the min_commit_ts so that reading won't be blocked by locks. && caller_start_ts >= lock.min_commit_ts { lock.min_commit_ts = ... } Ok((TxnStatus::uncommitted(lock, min_commit_ts_pushed), None))} fn check_txn_status_from_pessimistic_primary_lock( txn: &mut MvccTxn, reader: &mut SnapshotReader<impl Snapshot>, primary_key: Key, lock: &Lock, current_ts: TimeStamp, resolving_pessimistic_lock: bool, ) -> Result<(Option<TxnStatus>, Option<ReleasedLock>)> { if lock.is_pessimistic_lock_with_conflict() { if let Some(txn_status) = check_determined_txn_status(reader, &primary_key)? { ... let released = txn.unlock_key(primary_key, true, TimeStamp::zero()); return Ok((Some(txn_status), released)); } } if lock.ts.physical() + lock.ttl < current_ts.physical() { return if resolving_pessimistic_lock { let released = txn.unlock_key(primary_key, true, TimeStamp::zero()); Ok((Some(TxnStatus::PessimisticRollBack), released)) } else { let released = rollback_lock(txn, reader, primary_key, lock, true, true)?; Ok((Some(TxnStatus::TtlExpire), released)) }; } Ok((None, None))}
CheckSecondaryLocks
接口主要是应用与 Async Commit
所用,用来查看异步 commit
的过程中,通过 primary lock
上面的 secondary
来查看所有的 prewrite lock
,进而分析事务到底是否提交。
/// Check secondary locks of an async commit transaction.
///
/// If all prewritten locks exist, the lock information is returned.
/// Otherwise, it returns the commit timestamp of the transaction.
///
/// If the lock does not exist or is a pessimistic lock, to prevent the
/// status being changed, a rollback may be written.
keys:事务涉及到被加锁的 keys
start_ts: 事务的开始 ts
对每个 key
查询所对应的 lock
如果通过某一个 key
发现了提交或者回滚记录,那么直接可以 break
,返回结果。
如果没有记录,也没有找到锁的话,那么就需要回滚,并且是以保护模式下进行回滚,然后 break
,返回结果。
否则的话需要遍历所有的 key
,收集 lock
信息
如果如预期一样查询到了事务的 lock
,那么就会使用 check_status_from_lock
进行进一步检查
和 checkTxnStatus
一样,如果 lock
是公平锁冲突加锁的话,需要进一步查看提交、回滚、无记录状态。如果是提交或者回滚状态,那么直接可以终止 CheckSecondaryLocks
返回结果。如果是无记录状态的话,可以将其当做普通的悲观锁
悲观锁是非预期状态,这个时候需要清理悲观锁,将其当做无记录也没有找到 lock
的场景来看,也就是执行回滚操作,然后终止 CheckSecondaryLocks
如果 lock
是悲观锁
如果 lock
是 prewrite lock
,符合预期,返回锁信息,继续检查其他 key
的状态
如果没有 lock
,或者没有查询到预期事务的 lock
,那么就会 check_determined_txn_status
进一步查询提交或者回滚的记录
fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> { ... let mut released_locks = ReleasedLocks::new(); let mut result = SecondaryLocksStatus::Locked(Vec::new()); for key in self.keys { let mut released_lock = None; let mut mismatch_lock = None; // Checks whether the given secondary lock exists. let (status, need_rollback, rollback_overlapped_write) = match reader.load_lock(&key)? { // The lock exists, the lock information is returned. Some(lock) if lock.ts == self.start_ts => { let (status, need_rollback, rollback_overlapped_write, lock_released) = check_status_from_lock(&mut txn, &mut reader, lock, &key, region_id)?; released_lock = lock_released; (status, need_rollback, rollback_overlapped_write) } // Searches the write CF for the commit record of the lock and returns the commit // timestamp (0 if the lock is not committed). l => { mismatch_lock = l; check_determined_txn_status(&mut reader, &key)? } }; if need_rollback { if let Some(l) = mismatch_lock { txn.mark_rollback_on_mismatching_lock(&key, l, true); } // We must protect this rollback in case this rollback is collapsed and a stale // acquire_pessimistic_lock and prewrite succeed again. if let Some(write) = make_rollback(self.start_ts, true, rollback_overlapped_write) { txn.put_write(key.clone(), self.start_ts, write.as_ref().to_bytes()); collapse_prev_rollback(&mut txn, &mut reader, &key)?; } } released_locks.push(released_lock); match status { SecondaryLockStatus::Locked(lock) => { result.push(lock.into_lock_info(key.to_raw()?)); } SecondaryLockStatus::Committed(commit_ts) => { result = SecondaryLocksStatus::Committed(commit_ts); break; } SecondaryLockStatus::RolledBack => { result = SecondaryLocksStatus::RolledBack; break; } } } ... }} fn check_status_from_lock<S: Snapshot>( txn: &mut MvccTxn, reader: &mut ReaderWithStats<'_, S>, lock: Lock, key: &Key, region_id: u64, ) -> Result<( SecondaryLockStatus, bool, Option<OverlappedWrite>, Option<ReleasedLock>, )> { let mut overlapped_write = None; if lock.is_pessimistic_lock_with_conflict() { let (status, need_rollback, rollback_overlapped_write) = check_determined_txn_status(reader, key)?; if !need_rollback { let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero()); return Ok(( ... )); } overlapped_write = rollback_overlapped_write; } if lock.is_pessimistic_lock() { let released_lock = txn.unlock_key(key.clone(), true, TimeStamp::zero()); let overlapped_write_res = if lock.is_pessimistic_lock_with_conflict() { overlapped_write } else { reader.get_txn_commit_record(key)?.unwrap_none(region_id) }; Ok(( ... )) } else { Ok((SecondaryLockStatus::Locked(lock), false, None, None)) }} fn check_determined_txn_status<S: Snapshot>( reader: &mut ReaderWithStats<'_, S>, key: &Key, ) -> Result<(SecondaryLockStatus, bool, Option<OverlappedWrite>)> { match reader.get_txn_commit_record(key)? { TxnCommitRecord::SingleRecord { commit_ts, write } => { let status = if write.write_type != WriteType::Rollback { SecondaryLockStatus::Committed(commit_ts) } else { SecondaryLockStatus::RolledBack }; // We needn't write a rollback once there is a write record for it: // If it's a committed record, it cannot be changed. // If it's a rollback record, it either comes from another // check_secondary_lock (thus protected) or the client stops commit // actively. So we don't need to make it protected again. Ok((status, false, None)) } TxnCommitRecord::OverlappedRollback { .. } => { Ok((SecondaryLockStatus::RolledBack, false, None)) } TxnCommitRecord::None { overlapped_write } => { Ok((SecondaryLockStatus::RolledBack, true, overlapped_write)) } } }
通过 checkTxnStatus
查询到 primary key
的事务状态后,就需要 ResolveLock
对 secondary key
进行提交或者回滚。如果 primary key
已经提交了,那么 ResolveLock
对 secondary key
进行提交。如果 primary key
已经回滚了,那么 ResolveLock
对 secondary key
进行回滚。
start_ts:事务的开始 ts
commit_ts: 事务的提交 ts。当需要回滚的时候,该值为 0;否则的话,该值不为 0
resolve_keys: 需要提交或者回滚的 secondary keys
代码非常简单了,直接调用提交或者回滚的函数即可。注意对于 secondary key
来说,回滚是非保护模式的。
impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for ResolveLockLite { fn process_write(self, snapshot: S, context: WriteContext<'_, L>) -> Result<WriteResult> { ... for key in self.resolve_keys { released_locks.push(if !self.commit_ts.is_zero() { commit(&mut txn, &mut reader, key, self.commit_ts)? } else { cleanup(&mut txn, &mut reader, key, TimeStamp::zero(), false)? }); } Ok(WriteResult { ... }) }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。