Skip to content

Commit

Permalink
kv: use correct strength for {Get,Scan,ReverseScan} for SKIP LOCKED
Browse files Browse the repository at this point in the history
TODO: add some tests for this.
TODO: polish.
  • Loading branch information
nvanbenschoten committed Nov 9, 2023
1 parent 8737933 commit 7bea05d
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 16 deletions.
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func Get(
h := cArgs.Header
reply := resp.(*kvpb.GetResponse)

var lockTableForSkipLocked storage.LockTableView
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
lockTableForSkipLocked = newRequestBoundLockTableView(cArgs.Concurrency, args.KeyLockingStrength)
}

getRes, err := storage.MVCCGet(ctx, readWriter, args.Key, h.Timestamp, storage.MVCCGetOptions{
Inconsistent: h.ReadConsistency != kvpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Expand All @@ -41,7 +46,7 @@ func Get(
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
LockTable: lockTableForSkipLocked,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
MaxKeys: cArgs.Header.MaxSpanRequestKeys,
TargetBytes: cArgs.Header.TargetBytes,
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func ReverseScan(
h := cArgs.Header
reply := resp.(*kvpb.ReverseScanResponse)

var lockTableForSkipLocked storage.LockTableView
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
lockTableForSkipLocked = newRequestBoundLockTableView(cArgs.Concurrency, args.KeyLockingStrength)
}

var res result.Result
var scanRes storage.MVCCScanResult
var err error
Expand All @@ -53,7 +58,7 @@ func ReverseScan(
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
LockTable: lockTableForSkipLocked,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func Scan(
h := cArgs.Header
reply := resp.(*kvpb.ScanResponse)

var lockTableForSkipLocked storage.LockTableView
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
lockTableForSkipLocked = newRequestBoundLockTableView(cArgs.Concurrency, args.KeyLockingStrength)
}

var res result.Result
var scanRes storage.MVCCScanResult
var err error
Expand All @@ -53,7 +58,7 @@ func Scan(
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
LockTable: lockTableForSkipLocked,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/kv/kvserver/batcheval/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,33 @@ func copyKey(k roachpb.Key) roachpb.Key {
copy(k2, k)
return k2
}

// txnBoundLockTableView is a transaction-bound view into an in-memory
// collections of key-level locks.
type txnBoundLockTableView interface {
IsKeyLockedByConflictingTxn(
roachpb.Key, lock.Strength,
) (bool, *enginepb.TxnMeta, error)
}

// requestBoundLockTableView combines a txnBoundLockTableView with the lock
// strength that an individual request is attempting to acquire.
type requestBoundLockTableView struct {
ltv txnBoundLockTableView
str lock.Strength
}

// newRequestBoundLockTableView creates a new requestBoundLockTableView.
func newRequestBoundLockTableView(
ltv txnBoundLockTableView, str lock.Strength,
) *requestBoundLockTableView {
return &requestBoundLockTableView{ltv: ltv, str: str}
}

// IsKeyLockedByConflictingTxn implements the storage.LockTableView interface.
func (ltv *requestBoundLockTableView) IsKeyLockedByConflictingTxn(
key roachpb.Key,
) (bool, *enginepb.TxnMeta, error) {
// TODO(nvanbenschoten): look for replicated lock conflicts.
return ltv.ltv.IsKeyLockedByConflictingTxn(key, ltv.str)
}
8 changes: 1 addition & 7 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,13 +1190,7 @@ type LockTableView interface {
//
// This method is used by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
//
// If the supplied lock strength is locking (!= lock.None), then any queued
// locking requests that came before the lockTableGuard will also be checked
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta, error)
IsKeyLockedByConflictingTxn(roachpb.Key) (bool, *enginepb.TxnMeta, error)
}

// MVCCGetOptions bundles options for the MVCCGet family of functions.
Expand Down
7 changes: 1 addition & 6 deletions pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -1805,11 +1804,7 @@ func (p *pebbleMVCCScanner) isKeyLockedByConflictingTxn(
p.err = err
return false, false
}
strength := lock.None
if p.failOnMoreRecent {
strength = lock.Exclusive
}
ok, txn, err := p.lockTable.IsKeyLockedByConflictingTxn(key, strength)
ok, txn, err := p.lockTable.IsKeyLockedByConflictingTxn(key)
if err != nil {
p.err = err
return false, false
Expand Down

0 comments on commit 7bea05d

Please sign in to comment.