Skip to content

Commit

Permalink
concurrency: bubble up errors from isKeyLockedByConflictingTxn
Browse files Browse the repository at this point in the history
Avoid a panic that would have previously crashed the node.

We don't expect locking skip locked requests to encounter waiting
requests from their own transactions during normal operation. However,
this could happen if they're being replayed for some reason. We
shouldn't crash the node in such cases.

Epic: none

Release note: None
  • Loading branch information
arulajmani committed Aug 18, 2023
1 parent 6342e6f commit d0d6a5d
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ type lockTableGuard interface {
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta)
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta, error)
}

// lockTableWaiter is concerned with waiting in lock wait-queues for locks held
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ func (g *Guard) CheckOptimisticNoLatchConflicts() (ok bool) {
// returned, but so is nil.
func (g *Guard) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
) (bool, *enginepb.TxnMeta) {
) (bool, *enginepb.TxnMeta, error) {
return g.ltg.IsKeyLockedByConflictingTxn(key, strength)
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ func TestConcurrencyManagerBasic(t *testing.T) {
var key string
d.ScanArgs(t, "key", &key)
strength := concurrency.ScanLockStrength(t, d)
if ok, txn := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength); ok {
ok, txn, err := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength)
if err != nil {
return err.Error()
}
if ok {
holder := "<nil>"
if txn != nil {
holder = txn.ID.String()
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,32 +678,32 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(

func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
key roachpb.Key, str lock.Strength,
) (bool, *enginepb.TxnMeta) {
) (bool, *enginepb.TxnMeta, error) {
iter := g.tableSnapshot.MakeIter()
iter.SeekGE(&lockState{key: key})
if !iter.Valid() || !iter.Cur().key.Equal(key) {
// No lock on key.
return false, nil
return false, nil, nil
}
l := iter.Cur()
l.mu.Lock()
defer l.mu.Unlock()
if l.isEmptyLock() {
// The lock is empty but has not yet been deleted.
return false, nil
return false, nil, nil
}

if l.alreadyHoldsLockAndIsAllowedToProceed(g, str) {
// If another request from this transaction has already locked this key with
// sufficient locking strength then there's no conflict; we can proceed.
return false, nil
return false, nil, nil
}

if l.isHeld() {
lockHolderTxn, _ := l.getLockHolder()
if !g.isSameTxn(lockHolderTxn) &&
lock.Conflicts(l.getLockMode(), makeLockMode(str, g.txn, g.ts), &g.lt.settings.SV) {
return true, l.holder.txn // they key is locked by some other transaction; return the holder
return true, l.holder.txn, nil // they key is locked by some other txn; return the holder
}
// We can be in either of 2 cases at this point:
// 1. All locks held on this key are at non-conflicting strengths (i.e,
Expand All @@ -719,7 +719,7 @@ func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
// preventing a stream of locking[1] SKIP LOCKED requests from starving out
// regular locking requests.
if str == lock.None { // [1] we only need to do this checking for locking requests
return false, nil
return false, nil, nil
}

for e := l.queuedWriters.Front(); e != nil; e = e.Next() {
Expand All @@ -731,15 +731,15 @@ func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
break
}
if g.isSameTxn(qqg.guard.txnMeta()) {
panic(errors.AssertionFailedf(
return false, nil, errors.AssertionFailedf(
"SKIP LOCKED request should not find another waiting request from the same transaction",
))
)
}
if lock.Conflicts(qqg.mode, makeLockMode(str, g.txn, g.ts), &g.lt.settings.SV) {
return true, nil // the conflict isn't with a lock holder, nil is returned
return true, nil, nil // the conflict isn't with a lock holder, nil is returned
}
}
return false, nil // no conflict
return false, nil, nil // no conflict
}

func (g *lockTableGuardImpl) notify() {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,11 @@ func TestLockTableBasic(t *testing.T) {
var key string
d.ScanArgs(t, "k", &key)
strength := ScanLockStrength(t, d)
if ok, txn := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength); ok {
ok, txn, err := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength)
if err != nil {
return err.Error()
}
if ok {
holder := "<nil>"
if txn != nil {
holder = txn.ID.String()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (g *mockLockTableGuard) CheckOptimisticNoConflicts(*lockspanset.LockSpanSet
}
func (g *mockLockTableGuard) IsKeyLockedByConflictingTxn(
roachpb.Key, lock.Strength,
) (bool, *enginepb.TxnMeta) {
) (bool, *enginepb.TxnMeta, error) {
panic("unimplemented")
}
func (g *mockLockTableGuard) notify() { g.signal <- struct{}{} }
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/concurrency/testdata/lock_table/skip_locked
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,23 @@ num=7
queued writers:
active: true req: 7, txn: 00000000-0000-0000-0000-000000000001
distinguished req: 7

# ---------------------------------------------------------------------------------
# Ensure the case where a locking skip locked request finds another request from
# its own transaction correctly returns an error.
# ---------------------------------------------------------------------------------

new-request r=req10 txn=txn1 ts=10,1 spans=exclusive@f skip-locked
----

scan r=req10
----
start-waiting: false

should-wait r=req10
----
false

is-key-locked-by-conflicting-txn r=req10 k=f strength=exclusive
----
SKIP LOCKED request should not find another waiting request from the same transaction
2 changes: 1 addition & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ type LockTableView interface {
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta)
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta, error)
}

// MVCCGetOptions bundles options for the MVCCGet family of functions.
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2475,18 +2475,18 @@ type mockLockTableView struct {

func (lt *mockLockTableView) IsKeyLockedByConflictingTxn(
k roachpb.Key, s lock.Strength,
) (bool, *enginepb.TxnMeta) {
) (bool, *enginepb.TxnMeta, error) {
holder, ok := lt.locks[string(k)]
if !ok {
return false, nil
return false, nil, nil
}
if lt.txn != nil && lt.txn.ID == holder.ID {
return false, nil
return false, nil, nil
}
if s == lock.None && lt.ts.Less(holder.WriteTimestamp) {
return false, nil
return false, nil, nil
}
return true, &holder.TxnMeta
return true, &holder.TxnMeta, nil
}

func (e *evalCtx) visitWrappedIters(fn func(it storage.SimpleMVCCIterator) (done bool)) {
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,12 @@ func (p *pebbleMVCCScanner) isKeyLockedByConflictingTxn(
if p.failOnMoreRecent {
strength = lock.Exclusive
}
if ok, txn := p.lockTable.IsKeyLockedByConflictingTxn(key, strength); ok {
ok, txn, err := p.lockTable.IsKeyLockedByConflictingTxn(key, strength)
if err != nil {
p.err = err
return false, false
}
if ok {
// The key is locked or reserved, so ignore it.
if txn != nil && (p.maxIntents == 0 || int64(p.intents.Count()) < p.maxIntents) {
// However, if the key is locked, we return the lock holder separately
Expand Down

0 comments on commit d0d6a5d

Please sign in to comment.