Skip to content

Commit

Permalink
kv: use correct strength for {Get,Scan,ReverseScan} for SKIP LOCKED
Browse files Browse the repository at this point in the history
Previously, when calling into the in-memory lock table to determine
whether a key needs to be skipped because of the SKIP LOCKED wait
policy, we would infer the request's strength using the
`FailOnMoreRecent` option. The introduction of shared locks broke this
inference. For this reason, SELECT FOR SHARE SKIP LOCKED was disallowed.

This patch polishes and adds testing for 7bea05d, thereby enabling
SELECT FOR SHARE SKIP LOCKED.

Resolves #110743

Release note (sql change): SELECT FOR SHARE SKIP LOCKED, which was
previously disallowed, no works.
  • Loading branch information
arulajmani committed Jan 16, 2024
1 parent 6f0f2ae commit ed0ccb4
Show file tree
Hide file tree
Showing 16 changed files with 1,366 additions and 307 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.GetForShareSkipLocked(ctx, tk(1)) // usage of shared locks in conjunction with skip locked wait policy is currently unsupported
db0.GetForShareSkipLocked(ctx, tk(1)) // @<ts> (v1, <nil>)
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.GetForShareSkipLockedGuaranteedDurability(ctx, tk(1)) // usage of shared locks in conjunction with skip locked wait policy is currently unsupported
db0.GetForShareSkipLockedGuaranteedDurability(ctx, tk(1)) // @<ts> (v1, <nil>)
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.ReverseScanForShareSkipLocked(ctx, tk(1), tk(2), 0) // usage of shared locks in conjunction with skip locked wait policy is currently unsupported
db0.ReverseScanForShareSkipLocked(ctx, tk(1), tk(2), 0) // @<ts> (/Table/100/"0000000000000001":v21, <nil>)
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.ReverseScanForShareSkipLockedGuaranteedDurability(ctx, tk(1), tk(2), 0) // usage of shared locks in conjunction with skip locked wait policy is currently unsupported
db0.ReverseScanForShareSkipLockedGuaranteedDurability(ctx, tk(1), tk(2), 0) // @<ts> (/Table/100/"0000000000000001":v21, <nil>)
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.ScanForShareSkipLocked(ctx, tk(1), tk(3), 0) // usage of shared locks in conjunction with skip locked wait policy is currently unsupported
db0.ScanForShareSkipLocked(ctx, tk(1), tk(3), 0) // @<ts> (/Table/100/"0000000000000001":v1, <nil>)
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.ScanForShareSkipLockedGuaranteedDurability(ctx, tk(1), tk(3), 0) // usage of shared locks in conjunction with skip locked wait policy is currently unsupported
db0.ScanForShareSkipLockedGuaranteedDurability(ctx, tk(1), tk(3), 0) // @<ts> (/Table/100/"0000000000000001":v1, <nil>)
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func Get(
h := cArgs.Header
reply := resp.(*kvpb.GetResponse)

if err := maybeDisallowSkipLockedRequest(h, args.KeyLockingStrength); err != nil {
return result.Result{}, err
var lockTableForSkipLocked storage.LockTableView
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
lockTableForSkipLocked = newRequestBoundLockTableView(cArgs.Concurrency, args.KeyLockingStrength)
}

getRes, err := storage.MVCCGet(ctx, readWriter, args.Key, h.Timestamp, storage.MVCCGetOptions{
Expand All @@ -45,7 +46,7 @@ func Get(
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
LockTable: lockTableForSkipLocked,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
MaxKeys: cArgs.Header.MaxSpanRequestKeys,
TargetBytes: cArgs.Header.TargetBytes,
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func ReverseScan(
h := cArgs.Header
reply := resp.(*kvpb.ReverseScanResponse)

if err := maybeDisallowSkipLockedRequest(h, args.KeyLockingStrength); err != nil {
return result.Result{}, err
var lockTableForSkipLocked storage.LockTableView
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
lockTableForSkipLocked = newRequestBoundLockTableView(cArgs.Concurrency, args.KeyLockingStrength)
}

var res result.Result
Expand All @@ -58,7 +59,7 @@ func ReverseScan(
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
LockTable: lockTableForSkipLocked,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
ReadCategory: readCategory,
}
Expand Down
23 changes: 4 additions & 19 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ func Scan(
h := cArgs.Header
reply := resp.(*kvpb.ScanResponse)

if err := maybeDisallowSkipLockedRequest(h, args.KeyLockingStrength); err != nil {
return result.Result{}, err
var lockTableForSkipLocked storage.LockTableView
if h.WaitPolicy == lock.WaitPolicy_SkipLocked {
lockTableForSkipLocked = newRequestBoundLockTableView(cArgs.Concurrency, args.KeyLockingStrength)
}

var res result.Result
Expand All @@ -61,7 +62,7 @@ func Scan(
FailOnMoreRecent: args.KeyLockingStrength != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
LockTable: lockTableForSkipLocked,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
ReadCategory: readCategory,
}
Expand Down Expand Up @@ -157,22 +158,6 @@ func maybeInterceptDisallowedSkipLockedUsage(h kvpb.Header, err error) error {
return err
}

// maybeDisallowSkipLockedRequest returns an error if the skip locked wait
// policy is used in conjunction with shared locks.
//
// TODO(arul): this won't be needed once
// https://github.com/cockroachdb/cockroach/issues/110743 is addressed. Until
// then, we return unimplemented errors.
func maybeDisallowSkipLockedRequest(h kvpb.Header, str lock.Strength) error {
if h.WaitPolicy == lock.WaitPolicy_SkipLocked && str == lock.Shared {
return MarkSkipLockedUnsupportedError(errors.UnimplementedError(
errors.IssueLink{IssueURL: build.MakeIssueURL(110743)},
"usage of shared locks in conjunction with skip locked wait policy is currently unsupported",
))
}
return nil
}

// SkipLockedUnsupportedError is used to mark errors resulting from unsupported
// (currently unimplemented) uses of the skip locked wait policy.
type SkipLockedUnsupportedError struct{}
Expand Down
30 changes: 30 additions & 0 deletions pkg/kv/kvserver/batcheval/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,33 @@ func copyKey(k roachpb.Key) roachpb.Key {
copy(k2, k)
return k2
}

// txnBoundLockTableView is a transaction-bound view into an in-memory
// collections of key-level locks.
type txnBoundLockTableView interface {
IsKeyLockedByConflictingTxn(
roachpb.Key, lock.Strength,
) (bool, *enginepb.TxnMeta, error)
}

// requestBoundLockTableView combines a txnBoundLockTableView with the lock
// strength that an individual request is attempting to acquire.
type requestBoundLockTableView struct {
ltv txnBoundLockTableView
str lock.Strength
}

// newRequestBoundLockTableView creates a new requestBoundLockTableView.
func newRequestBoundLockTableView(
ltv txnBoundLockTableView, str lock.Strength,
) *requestBoundLockTableView {
return &requestBoundLockTableView{ltv: ltv, str: str}
}

// IsKeyLockedByConflictingTxn implements the storage.LockTableView interface.
func (ltv *requestBoundLockTableView) IsKeyLockedByConflictingTxn(
key roachpb.Key,
) (bool, *enginepb.TxnMeta, error) {
// TODO(arul): look for replicated lock conflicts.
return ltv.ltv.IsKeyLockedByConflictingTxn(key, ltv.str)
}
112 changes: 112 additions & 0 deletions pkg/kv/kvserver/batcheval/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -169,3 +172,112 @@ func TestCollectIntentsUsesSameIterator(t *testing.T) {
})
}
}

func TestRequestBoundLockTableView(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

lockHolderTxnID := uuid.MakeV4()
keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")

m := newMockTxnBoundLockTableView(lockHolderTxnID)
m.addLock(keyA, lock.Shared)
m.addLock(keyB, lock.Exclusive)

// Non-locking request.
ltView := newRequestBoundLockTableView(m, lock.None)
locked, _, err := ltView.IsKeyLockedByConflictingTxn(keyA)
require.NoError(t, err)
require.False(t, locked)

locked, _, err = ltView.IsKeyLockedByConflictingTxn(keyB)
require.NoError(t, err)
require.False(t, locked)

locked, _, err = ltView.IsKeyLockedByConflictingTxn(keyC)
require.NoError(t, err)
require.False(t, locked)

// Shared locking request.
ltView = newRequestBoundLockTableView(m, lock.Shared)
locked, _, err = ltView.IsKeyLockedByConflictingTxn(keyA)
require.NoError(t, err)
require.False(t, locked)

locked, txn, err := ltView.IsKeyLockedByConflictingTxn(keyB)
require.NoError(t, err)
require.True(t, locked)
require.Equal(t, txn.ID, lockHolderTxnID)

locked, _, err = ltView.IsKeyLockedByConflictingTxn(keyC)
require.NoError(t, err)
require.False(t, locked)

// Exclusive locking request.
ltView = newRequestBoundLockTableView(m, lock.Exclusive)
locked, txn, err = ltView.IsKeyLockedByConflictingTxn(keyA)
require.NoError(t, err)
require.True(t, locked)
require.Equal(t, txn.ID, lockHolderTxnID)

locked, txn, err = ltView.IsKeyLockedByConflictingTxn(keyB)
require.NoError(t, err)
require.True(t, locked)
require.Equal(t, txn.ID, lockHolderTxnID)

locked, _, err = ltView.IsKeyLockedByConflictingTxn(keyC)
require.NoError(t, err)
require.False(t, locked)
}

// mockTxnBoundLockTableView is a mocked version of the txnBoundLockTableView
// interface.
type mockTxnBoundLockTableView struct {
locks map[string]lock.Strength
lockHolderTxnID uuid.UUID // txnID of all held locks
}

var _ txnBoundLockTableView = &mockTxnBoundLockTableView{}

// newMockTxnBoundLockTableView constructs and returns a
// mockTxnBoundLockTableView.
func newMockTxnBoundLockTableView(lockHolderTxnID uuid.UUID) *mockTxnBoundLockTableView {
return &mockTxnBoundLockTableView{
locks: make(map[string]lock.Strength),
lockHolderTxnID: lockHolderTxnID,
}
}

// addLock adds a lock on the supplied key with the given lock strength. The
// lock is held by m.TxnID.
func (m mockTxnBoundLockTableView) addLock(key roachpb.Key, str lock.Strength) {
m.locks[key.String()] = str
}

// IsKeyLockedByConflictingTxn implements the txnBoundLockTableView interface.
func (m mockTxnBoundLockTableView) IsKeyLockedByConflictingTxn(
key roachpb.Key, str lock.Strength,
) (bool, *enginepb.TxnMeta, error) {
lockStr, locked := m.locks[key.String()]
if !locked {
return false, nil, nil
}
var conflicts bool
switch str {
case lock.None:
conflicts = false
return false, nil, nil
case lock.Shared:
conflicts = lockStr == lock.Exclusive
case lock.Exclusive:
conflicts = true
default:
panic("unknown lock strength")
}
if conflicts {
return true, &enginepb.TxnMeta{ID: m.lockHolderTxnID}, nil
}
return false, nil, nil
}
59 changes: 56 additions & 3 deletions pkg/sql/logictest/testdata/logic_test/select_for_share
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,65 @@ COMMIT
statement ok
SET enable_shared_locking_for_serializable = true

statement error usage of shared locks in conjunction with skip locked wait policy is currently unsupported
statement ok
BEGIN

query I
SELECT * FROM t WHERE a = 2 FOR SHARE
----
2

user testuser2

statement ok
SET enable_shared_locking_for_serializable = true

query TTTTTTTBB colnames,retry,rowsort
SELECT database_name, schema_name, table_name, lock_key_pretty, lock_strength, durability, isolation_level, granted, contended FROM crdb_internal.cluster_locks
----
database_name schema_name table_name lock_key_pretty lock_strength durability isolation_level granted contended
test public t /Table/106/1/2/0 Shared Unreplicated SERIALIZABLE true false

statement ok
BEGIN

query I
SELECT * FROM t FOR SHARE SKIP LOCKED
----
2

user root

query TTTTTTTBB colnames,retry,rowsort
SELECT database_name, schema_name, table_name, lock_key_pretty, lock_strength, durability, isolation_level, granted, contended FROM crdb_internal.cluster_locks
----
database_name schema_name table_name lock_key_pretty lock_strength durability isolation_level granted contended
test public t /Table/106/1/2/0 Shared Unreplicated SERIALIZABLE true false
test public t /Table/106/1/2/0 Shared Unreplicated SERIALIZABLE true false

statement ok
BEGIN

query I
SELECT * FROM t FOR UPDATE SKIP LOCKED
----

statement ok
COMMIT

# Complete the open transactions.
user testuser

statement ok
COMMIT

user testuser2

statement ok
COMMIT


# TODO(arul): Add a test to show that the session setting doesn't apply to read
# committed transactions. We currently can't issue SELECT FOR SHARE statements
# in read committed transactions because durable locking hasn't been fully
# hooked up.


17 changes: 5 additions & 12 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ func MVCCBlindPutInlineWithPrev(
return err
}

// LockTableView is a transaction-bound view into an in-memory collections of
// LockTableView is a request-bound snapshot into an in-memory collections of
// key-level locks. The set of per-key locks stored in the in-memory lock table
// structure overlaps with those stored in the persistent lock table keyspace
// (i.e. intents produced by an MVCCKeyAndIntentsIterKind iterator), but one is
Expand All @@ -1176,20 +1176,13 @@ func MVCCBlindPutInlineWithPrev(
// table keyspace (i.e. replicated locks that have yet to be "discovered").
type LockTableView interface {
// IsKeyLockedByConflictingTxn returns whether the specified key is locked by
// a conflicting transaction in the lockTableGuard's snapshot of the lock
// table, given the caller's own desired locking strength. If so, true is
// returned and so is the lock holder. If the lock is held by the transaction
// itself, there's no conflict to speak of, so false is returned.
// a conflicting transaction in the request's snapshot of the lock table,
// given the request's own desired locking strength. If so, true is returned
// and so is the lock holder. Otherwise, false is returned.
//
// This method is used by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
//
// If the supplied lock strength is locking (!= lock.None), then any queued
// locking requests that came before the lockTableGuard will also be checked
// for conflicts. This helps prevent a stream of locking SKIP LOCKED requests
// from starving out regular locking requests. In such cases, true is
// returned, but so is nil.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta, error)
IsKeyLockedByConflictingTxn(roachpb.Key) (bool, *enginepb.TxnMeta, error)
}

// MVCCGetOptions bundles options for the MVCCGet family of functions.
Expand Down
Loading

0 comments on commit ed0ccb4

Please sign in to comment.