Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110323: storage: add MVCCCheckForAcquireLock and MVCCAcquireLock functions r=nvanbenschoten a=nvanbenschoten

Fixes #109646.
Informs #100193.

This PR adds and implements two new MVCC functions: `MVCCCheckForAcquireLock` and `MVCCAcquireLock`. The former scans the replicated lock table to determine whether a lock acquisition is permitted. It will be used by unreplicated lock acquisition. The latter does the same, but then also writes the lock to the replicated lock table if permitted. It will be used by replicated lock acquisition.

MVCCStats handling is left as a TODO for after #109645 is addressed.

----

The two functions are built using a new abstraction, the `lockTableKeyScanner`.

The `lockTableKeyScanner` uses a `LockTableIterator` to scan a single key in the replicated lock table. It searches for locks on the key that conflict with a (transaction, strength) pair and for locks that the transaction has already acquired on the key.

The purpose of a `lockTableKeyScanner` is to determine whether a transaction can acquire a lock on a key or perform an MVCC mutation on a key, and if so, what lock table keys the transaction should write to perform the operation. It is used by this commit to implement the two new MVCC functions. In a future commit, it will also start to be used by `mvccPutInternal`, the kernel of all MVCC mutations.

Release note: None

110652: kvserver: get Stats and GCHint under same RLock r=erikgrinaker a=pavelkalinnikov

Before this commit, `makeMVCCGCQueueScore` unlocked `Replica.mu` between reading `ReplicaState.Stats` and `ReplicaState.GCHint`. The GC scoring function uses both fields, we would like them to be in sync. The unlock makes it possible for `Stats` and `GCHint` to correspond to different states of the `Replica`.

This commit moves both reads under the same `RLock`/`RUnlock`.

Epic: none
Release note: none

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
3 people committed Sep 14, 2023
3 parents c34dd5f + e329dbf + 44b6bd7 commit ee3765b
Show file tree
Hide file tree
Showing 9 changed files with 1,050 additions and 69 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,6 +1093,9 @@ func (ulh *unreplicatedLockHolderInfo) rollbackIgnoredSeqNumbers(
if len(ignoredSeqNums) == 0 {
return
}
// NOTE: this logic differs slightly from replicated lock acquisition, where
// we don't rollback locks at ignored sequence numbers unless they are the
// same strength as the lock acquisition. See the comment in MVCCAcquireLock.
for strIdx, minSeqNumber := range ulh.strengths {
if minSeqNumber == -1 {
continue
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,10 @@ func makeMVCCGCQueueScore(
gcTTL time.Duration,
canAdvanceGCThreshold bool,
) mvccGCQueueScore {
repl.mu.Lock()
repl.mu.RLock()
ms := *repl.mu.state.Stats
repl.mu.Unlock()
hint := *repl.mu.state.GCHint
repl.mu.RUnlock()

if repl.store.cfg.TestingKnobs.DisableLastProcessedCheck {
lastGC = hlc.Timestamp{}
Expand All @@ -301,7 +302,7 @@ func makeMVCCGCQueueScore(
// trigger GC at the same time.
r := makeMVCCGCQueueScoreImpl(
ctx, int64(repl.RangeID), now, ms, gcTTL, lastGC, canAdvanceGCThreshold,
repl.GetGCHint(), gc.TxnCleanupThreshold.Get(&repl.ClusterSettings().SV),
hint, gc.TxnCleanupThreshold.Get(&repl.ClusterSettings().SV),
)
return r
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"in_mem.go",
"intent_interleaving_iter.go",
"lock_table_iterator.go",
"lock_table_key_scanner.go",
"min_version.go",
"mvcc.go",
"mvcc_incremental_iterator.go",
Expand Down
307 changes: 307 additions & 0 deletions pkg/storage/lock_table_key_scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"sync"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

// Fixed length slice for all supported lock strengths for replicated locks. May
// be used to iterate supported lock strengths in strength order (strongest to
// weakest).
var replicatedLockStrengths = [...]lock.Strength{lock.Intent, lock.Exclusive, lock.Shared}

func init() {
if replicatedLockStrengths[0] != lock.MaxStrength {
panic("replicatedLockStrengths[0] != lock.MaxStrength; update replicatedLockStrengths?")
}
}

// replicatedLockStrengthToIndexMap returns a mapping between (strength, index)
// pairs that can be used to index into the lockTableScanner.ownLocks array.
//
// Trying to use a lock strength that isn't supported with replicated locks to
// index into the lockTableScanner.ownLocks array will cause a runtime error.
var replicatedLockStrengthToIndexMap = func() (m [lock.MaxStrength + 1]int) {
// Initialize all to -1.
for str := range m {
m[str] = -1
}
// Set the indices of the valid strengths.
for i, str := range replicatedLockStrengths {
m[str] = i
}
return m
}()

// strongerOrEqualStrengths returns all supported lock strengths for replicated
// locks that are as strong or stronger than the provided strength. The returned
// slice is ordered from strongest to weakest.
func strongerOrEqualStrengths(str lock.Strength) []lock.Strength {
return replicatedLockStrengths[:replicatedLockStrengthToIndexMap[str]+1]
}

// minConflictLockStrength returns the minimum lock strength that conflicts with
// the provided lock strength.
func minConflictLockStrength(str lock.Strength) (lock.Strength, error) {
switch str {
case lock.Shared:
return lock.Exclusive, nil
case lock.Exclusive, lock.Intent:
return lock.Shared, nil
default:
return 0, errors.AssertionFailedf(
"lockTableKeyScanner: unexpected lock strength %s", str.String())
}
}

// lockTableKeyScanner is used to scan a single key in the replicated lock
// table. It searches for locks on the key that conflict with a (transaction,
// lock strength) pair and for locks that the transaction has already acquired
// on the key.
//
// The purpose of a lockTableKeyScanner is to determine whether a transaction
// can acquire a lock on a key or perform an MVCC mutation on a key, and if so,
// what lock table keys the transaction should write to perform the operation.
type lockTableKeyScanner struct {
iter *LockTableIterator
// The transaction attempting to acquire a lock. The ID will be zero if a
// non-transactional request is attempting to perform an MVCC mutation.
txnID uuid.UUID
// Stop adding conflicting locks and abort scan once the maxConflicts limit
// is reached. Ignored if zero.
maxConflicts int64

// Stores any error returned. If non-nil, iteration short circuits.
err error
// Stores any locks that conflict with the transaction and locking strength.
conflicts []roachpb.Lock
// Stores any locks that the transaction has already acquired.
ownLocks [len(replicatedLockStrengths)]*enginepb.MVCCMetadata

// Avoids heap allocations.
ltKeyBuf []byte
ltValue enginepb.MVCCMetadata
firstOwnLock enginepb.MVCCMetadata
}

var lockTableKeyScannerPool = sync.Pool{
New: func() interface{} { return new(lockTableKeyScanner) },
}

// newLockTableKeyScanner creates a new lockTableKeyScanner.
//
// txn is the transaction attempting to acquire locks. If txn is not nil, locks
// held by the transaction with any strength will be accumulated into the
// ownLocks array. Otherwise, if txn is nil, the request is non-transactional
// and no locks will be accumulated into the ownLocks array.
//
// str is the strength of the lock that the transaction (or non-transactional
// request) is attempting to acquire. The scanner will search for locks held by
// other transactions that conflict with this strength.
//
// maxConflicts is the maximum number of conflicting locks that the scanner
// should accumulate before returning an error. If maxConflicts is zero, the
// scanner will accumulate all conflicting locks.
func newLockTableKeyScanner(
reader Reader, txn *roachpb.Transaction, str lock.Strength, maxConflicts int64,
) (*lockTableKeyScanner, error) {
var txnID uuid.UUID
if txn != nil {
txnID = txn.ID
}
minConflictStr, err := minConflictLockStrength(str)
if err != nil {
return nil, err
}
iter, err := NewLockTableIterator(reader, LockTableIteratorOptions{
Prefix: true,
MatchTxnID: txnID,
MatchMinStr: minConflictStr,
})
if err != nil {
return nil, err
}
s := lockTableKeyScannerPool.Get().(*lockTableKeyScanner)
s.iter = iter
s.txnID = txnID
s.maxConflicts = maxConflicts
return s, nil
}

func (s *lockTableKeyScanner) close() {
s.iter.Close()
*s = lockTableKeyScanner{ltKeyBuf: s.ltKeyBuf}
lockTableKeyScannerPool.Put(s)
}

// scan scans the lock table at the provided key for locks held by other
// transactions that conflict with the configured locking strength and for locks
// of any strength that the configured transaction has already acquired.
func (s *lockTableKeyScanner) scan(key roachpb.Key) error {
s.resetScanState()
for ok := s.seek(key); ok; ok = s.getOneAndAdvance() {
}
return s.afterScan()
}

// resetScanState resets the scanner's state before a scan.
func (s *lockTableKeyScanner) resetScanState() {
s.err = nil
s.conflicts = nil
for i := range s.ownLocks {
s.ownLocks[i] = nil
}
s.ltValue.Reset()
s.firstOwnLock.Reset()
}

// afterScan returns any error encountered during the scan.
func (s *lockTableKeyScanner) afterScan() error {
if s.err != nil {
return s.err
}
if len(s.conflicts) != 0 {
return &kvpb.LockConflictError{Locks: s.conflicts}
}
return nil
}

// seek seeks the iterator to the first lock table key associated with the
// provided key. Returns true if the scanner should continue scanning, false
// if not.
func (s *lockTableKeyScanner) seek(key roachpb.Key) bool {
var ltKey roachpb.Key
ltKey, s.ltKeyBuf = keys.LockTableSingleKey(key, s.ltKeyBuf)
valid, err := s.iter.SeekEngineKeyGE(EngineKey{Key: ltKey})
if err != nil {
s.err = err
}
return valid
}

// getOneAndAdvance consumes the current lock table key and value and advances
// the iterator. Returns true if the scanner should continue scanning, false if
// not.
func (s *lockTableKeyScanner) getOneAndAdvance() bool {
ltKey, ok := s.getLockTableKey()
if !ok {
return false
}
ltValue, ok := s.getLockTableValue()
if !ok {
return false
}
if !s.consumeLockTableKeyValue(ltKey, ltValue) {
return false
}
return s.advance()
}

// advance advances the iterator to the next lock table key.
func (s *lockTableKeyScanner) advance() bool {
valid, err := s.iter.NextEngineKey()
if err != nil {
s.err = err
}
return valid
}

// getLockTableKey decodes the current lock table key.
func (s *lockTableKeyScanner) getLockTableKey() (LockTableKey, bool) {
ltEngKey, err := s.iter.UnsafeEngineKey()
if err != nil {
s.err = err
return LockTableKey{}, false
}
ltKey, err := ltEngKey.ToLockTableKey()
if err != nil {
s.err = err
return LockTableKey{}, false
}
return ltKey, true
}

// getLockTableValue decodes the current lock table values.
func (s *lockTableKeyScanner) getLockTableValue() (*enginepb.MVCCMetadata, bool) {
err := s.iter.ValueProto(&s.ltValue)
if err != nil {
s.err = err
return nil, false
}
return &s.ltValue, true
}

// consumeLockTableKeyValue consumes the current lock table key and value, which
// is either a conflicting lock or a lock held by the scanning transaction.
func (s *lockTableKeyScanner) consumeLockTableKeyValue(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
if ltValue.Txn == nil {
s.err = errors.AssertionFailedf("unexpectedly found non-transactional lock: %v", ltValue)
return false
}
if ltKey.TxnUUID != ltValue.Txn.ID {
s.err = errors.AssertionFailedf("lock table key (%+v) and value (%+v) txn ID mismatch", ltKey, ltValue)
return false
}
if ltKey.TxnUUID == s.txnID {
return s.consumeOwnLock(ltKey, ltValue)
}
return s.consumeConflictingLock(ltKey, ltValue)
}

// consumeOwnLock consumes a lock held by the scanning transaction.
func (s *lockTableKeyScanner) consumeOwnLock(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
var ltValueCopy *enginepb.MVCCMetadata
if s.firstOwnLock.Txn == nil {
// This is the first lock held by the transaction that we've seen, so
// we can avoid the heap allocation.
ltValueCopy = &s.firstOwnLock
} else {
ltValueCopy = new(enginepb.MVCCMetadata)
}
// NOTE: this will alias internal pointer fields of ltValueCopy with those
// in ltValue, but this will not lead to issues when ltValue is updated by
// the next call to getLockTableValue, because its internal fields will be
// reset by protoutil.Unmarshal before unmarshalling.
*ltValueCopy = *ltValue
s.ownLocks[replicatedLockStrengthToIndexMap[ltKey.Strength]] = ltValueCopy
return true
}

// consumeConflictingLock consumes a conflicting lock.
func (s *lockTableKeyScanner) consumeConflictingLock(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
conflict := roachpb.MakeLock(ltValue.Txn, ltKey.Key.Clone(), ltKey.Strength)
s.conflicts = append(s.conflicts, conflict)
if s.maxConflicts != 0 && s.maxConflicts == int64(len(s.conflicts)) {
return false
}
return true
}

// foundOwn returns the lock table value for the provided strength if the
// transaction has already acquired a lock of that strength. Returns nil if not.
func (s *lockTableKeyScanner) foundOwn(str lock.Strength) *enginepb.MVCCMetadata {
return s.ownLocks[replicatedLockStrengthToIndexMap[str]]
}
Loading

0 comments on commit ee3765b

Please sign in to comment.