Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add MVCCCheckForAcquireLock and MVCCAcquireLock functions #110323

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,9 @@ func (ulh *unreplicatedLockHolderInfo) rollbackIgnoredSeqNumbers(
if len(ignoredSeqNums) == 0 {
return
}
// NOTE: this logic differs slightly from replicated lock acquisition, where
// we don't rollback locks at ignored sequence numbers unless they are the
// same strength as the lock acquisition. See the comment in MVCCAcquireLock.
for strIdx, minSeqNumber := range ulh.strengths {
if minSeqNumber == -1 {
continue
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"in_mem.go",
"intent_interleaving_iter.go",
"lock_table_iterator.go",
"lock_table_key_scanner.go",
"min_version.go",
"mvcc.go",
"mvcc_incremental_iterator.go",
Expand Down
307 changes: 307 additions & 0 deletions pkg/storage/lock_table_key_scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

// Fixed length slice for all supported lock strengths for replicated locks. May
// be used to iterate supported lock strengths in strength order (strongest to
// weakest).
var replicatedLockStrengths = [...]lock.Strength{lock.Intent, lock.Exclusive, lock.Shared}

func init() {
if replicatedLockStrengths[0] != lock.MaxStrength {
panic("replicatedLockStrengths[0] != lock.MaxStrength; update replicatedLockStrengths?")
}
}

// replicatedLockStrengthToIndexMap returns a mapping between (strength, index)
// pairs that can be used to index into the lockTableScanner.ownLocks array.
//
// Trying to use a lock strength that isn't supported with replicated locks to
// index into the lockTableScanner.ownLocks array will cause a runtime error.
var replicatedLockStrengthToIndexMap = func() (m [lock.MaxStrength + 1]int) {
// Initialize all to -1.
for str := range m {
m[str] = -1
}
// Set the indices of the valid strengths.
for i, str := range replicatedLockStrengths {
m[str] = i
}
return m
}()

// strongerOrEqualStrengths returns all supported lock strengths for replicated
// locks that are as strong or stronger than the provided strength. The returned
// slice is ordered from strongest to weakest.
func strongerOrEqualStrengths(str lock.Strength) []lock.Strength {
return replicatedLockStrengths[:replicatedLockStrengthToIndexMap[str]+1]
}

// minConflictLockStrength returns the minimum lock strength that conflicts with
// the provided lock strength.
func minConflictLockStrength(str lock.Strength) (lock.Strength, error) {
switch str {
case lock.Shared:
return lock.Exclusive, nil
case lock.Exclusive, lock.Intent:
return lock.Shared, nil
default:
return 0, errors.AssertionFailedf(
"lockTableKeyScanner: unexpected lock strength %s", str.String())
}
}

// lockTableKeyScanner is used to scan a single key in the replicated lock
// table. It searches for locks on the key that conflict with a (transaction,
// lock strength) pair and for locks that the transaction has already acquired
// on the key.
//
// The purpose of a lockTableKeyScanner is to determine whether a transaction
// can acquire a lock on a key or perform an MVCC mutation on a key, and if so,
// what lock table keys the transaction should write to perform the operation.
type lockTableKeyScanner struct {
iter *LockTableIterator
// The transaction attempting to acquire a lock. The ID will be zero if a
// non-transactional request is attempting to perform an MVCC mutation.
txnID uuid.UUID
// Stop adding conflicting locks and abort scan once the maxConflicts limit
// is reached. Ignored if zero.
maxConflicts int64

// Stores any error returned. If non-nil, iteration short circuits.
err error
// Stores any locks that conflict with the transaction and locking strength.
conflicts []roachpb.Lock
// Stores any locks that the transaction has already acquired.
ownLocks [len(replicatedLockStrengths)]*enginepb.MVCCMetadata

// Avoids heap allocations.
ltKeyBuf []byte
ltValue enginepb.MVCCMetadata
firstOwnLock enginepb.MVCCMetadata
}

var lockTableKeyScannerPool = sync.Pool{
New: func() interface{} { return new(lockTableKeyScanner) },
}

// newLockTableKeyScanner creates a new lockTableKeyScanner.
//
// txn is the transaction attempting to acquire locks. If txn is not nil, locks
// held by the transaction with any strength will be accumulated into the
// ownLocks array. Otherwise, if txn is nil, the request is non-transactional
// and no locks will be accumulated into the ownLocks array.
//
// str is the strength of the lock that the transaction (or non-transactional
// request) is attempting to acquire. The scanner will search for locks held by
// other transactions that conflict with this strength.
//
// maxConflicts is the maximum number of conflicting locks that the scanner
// should accumulate before returning an error. If maxConflicts is zero, the
// scanner will accumulate all conflicting locks.
func newLockTableKeyScanner(
reader Reader, txn *roachpb.Transaction, str lock.Strength, maxConflicts int64,
) (*lockTableKeyScanner, error) {
var txnID uuid.UUID
if txn != nil {
txnID = txn.ID
}
minConflictStr, err := minConflictLockStrength(str)
if err != nil {
return nil, err
}
iter, err := NewLockTableIterator(reader, LockTableIteratorOptions{
Prefix: true,
MatchTxnID: txnID,
MatchMinStr: minConflictStr,
})
if err != nil {
return nil, err
}
s := lockTableKeyScannerPool.Get().(*lockTableKeyScanner)
s.iter = iter
s.txnID = txnID
s.maxConflicts = maxConflicts
return s, nil
}

func (s *lockTableKeyScanner) close() {
s.iter.Close()
*s = lockTableKeyScanner{ltKeyBuf: s.ltKeyBuf}
lockTableKeyScannerPool.Put(s)
}

// scan scans the lock table at the provided key for locks held by other
// transactions that conflict with the configured locking strength and for locks
// of any strength that the configured transaction has already acquired.
func (s *lockTableKeyScanner) scan(key roachpb.Key) error {
s.resetScanState()
for ok := s.seek(key); ok; ok = s.getOneAndAdvance() {
}
return s.afterScan()
}

// resetScanState resets the scanner's state before a scan.
func (s *lockTableKeyScanner) resetScanState() {
s.err = nil
s.conflicts = nil
for i := range s.ownLocks {
s.ownLocks[i] = nil
}
s.ltValue.Reset()
s.firstOwnLock.Reset()
}

// afterScan returns any error encountered during the scan.
func (s *lockTableKeyScanner) afterScan() error {
if s.err != nil {
return s.err
}
if len(s.conflicts) != 0 {
return &kvpb.LockConflictError{Locks: s.conflicts}
}
return nil
}

// seek seeks the iterator to the first lock table key associated with the
// provided key. Returns true if the scanner should continue scanning, false
// if not.
func (s *lockTableKeyScanner) seek(key roachpb.Key) bool {
var ltKey roachpb.Key
ltKey, s.ltKeyBuf = keys.LockTableSingleKey(key, s.ltKeyBuf)
valid, err := s.iter.SeekEngineKeyGE(EngineKey{Key: ltKey})
if err != nil {
s.err = err
}
return valid
}

// getOneAndAdvance consumes the current lock table key and value and advances
// the iterator. Returns true if the scanner should continue scanning, false if
// not.
func (s *lockTableKeyScanner) getOneAndAdvance() bool {
ltKey, ok := s.getLockTableKey()
if !ok {
return false
}
ltValue, ok := s.getLockTableValue()
if !ok {
return false
}
if !s.consumeLockTableKeyValue(ltKey, ltValue) {
return false
}
return s.advance()
}

// advance advances the iterator to the next lock table key.
func (s *lockTableKeyScanner) advance() bool {
valid, err := s.iter.NextEngineKey()
if err != nil {
s.err = err
}
return valid
}

// getLockTableKey decodes the current lock table key.
func (s *lockTableKeyScanner) getLockTableKey() (LockTableKey, bool) {
ltEngKey, err := s.iter.UnsafeEngineKey()
if err != nil {
s.err = err
return LockTableKey{}, false
}
ltKey, err := ltEngKey.ToLockTableKey()
if err != nil {
s.err = err
return LockTableKey{}, false
}
return ltKey, true
}

// getLockTableValue decodes the current lock table values.
func (s *lockTableKeyScanner) getLockTableValue() (*enginepb.MVCCMetadata, bool) {
err := s.iter.ValueProto(&s.ltValue)
if err != nil {
s.err = err
return nil, false
}
return &s.ltValue, true
}

// consumeLockTableKeyValue consumes the current lock table key and value, which
// is either a conflicting lock or a lock held by the scanning transaction.
func (s *lockTableKeyScanner) consumeLockTableKeyValue(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
if ltValue.Txn == nil {
s.err = errors.AssertionFailedf("unexpectedly found non-transactional lock: %v", ltValue)
return false
}
if ltKey.TxnUUID != ltValue.Txn.ID {
s.err = errors.AssertionFailedf("lock table key (%+v) and value (%+v) txn ID mismatch", ltKey, ltValue)
return false
}
if ltKey.TxnUUID == s.txnID {
return s.consumeOwnLock(ltKey, ltValue)
}
return s.consumeConflictingLock(ltKey, ltValue)
}

// consumeOwnLock consumes a lock held by the scanning transaction.
func (s *lockTableKeyScanner) consumeOwnLock(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
var ltValueCopy *enginepb.MVCCMetadata
if s.firstOwnLock.Txn == nil {
// This is the first lock held by the transaction that we've seen, so
// we can avoid the heap allocation.
ltValueCopy = &s.firstOwnLock
} else {
ltValueCopy = new(enginepb.MVCCMetadata)
}
// NOTE: this will alias internal pointer fields of ltValueCopy with those
// in ltValue, but this will not lead to issues when ltValue is updated by
// the next call to getLockTableValue, because its internal fields will be
// reset by protoutil.Unmarshal before unmarshalling.
*ltValueCopy = *ltValue
s.ownLocks[replicatedLockStrengthToIndexMap[ltKey.Strength]] = ltValueCopy
return true
}

// consumeConflictingLock consumes a conflicting lock.
func (s *lockTableKeyScanner) consumeConflictingLock(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
conflict := roachpb.MakeLock(ltValue.Txn, ltKey.Key.Clone(), ltKey.Strength)
s.conflicts = append(s.conflicts, conflict)
if s.maxConflicts != 0 && s.maxConflicts == int64(len(s.conflicts)) {
return false
}
return true
}

// foundOwn returns the lock table value for the provided strength if the
// transaction has already acquired a lock of that strength. Returns nil if not.
func (s *lockTableKeyScanner) foundOwn(str lock.Strength) *enginepb.MVCCMetadata {
return s.ownLocks[replicatedLockStrengthToIndexMap[str]]
}
Loading