Skip to content

Commit

Permalink
Merge #118296
Browse files Browse the repository at this point in the history
118296: kv: make non-txn'al locking reads check for replicated lock conflicts r=nvanbenschoten a=arulajmani

Non-transactional locking requests cannot acquire locks that outlive their request's lifespan. However, they do conflict with concurrent transactional locking requests.

Previously, we would skip checking for conflicts with replicated locks entirely when dealing with non-transactional requests. This patch fixes that and adds a regression test.

Closes #117628

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Feb 9, 2024
2 parents 7a764e5 + 3f89727 commit c954900
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 22 deletions.
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ func Get(
}

var res result.Result
if args.KeyLockingStrength != lock.None && h.Txn != nil && getRes.Value != nil {
if args.KeyLockingStrength != lock.None && getRes.Value != nil {
acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLockingStrength,
args.KeyLockingDurability, args.Key, cArgs.Stats, cArgs.EvalCtx.ClusterSettings())
if err != nil {
return result.Result{}, err
}
res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq}
if !acq.Empty() {
res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq}
}
}
res.Local.EncounteredIntents = intents
return res, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func ReverseScan(
}
}

if args.KeyLockingStrength != lock.None && h.Txn != nil {
if args.KeyLockingStrength != lock.None {
acquiredLocks, err := acquireLocksOnKeys(
ctx, readWriter, h.Txn, args.KeyLockingStrength, args.KeyLockingDurability,
args.ScanFormat, &scanRes, cArgs.Stats, cArgs.EvalCtx.ClusterSettings())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func Scan(
}
}

if args.KeyLockingStrength != lock.None && h.Txn != nil {
if args.KeyLockingStrength != lock.None {
acquiredLocks, err := acquireLocksOnKeys(
ctx, readWriter, h.Txn, args.KeyLockingStrength, args.KeyLockingDurability,
args.ScanFormat, &scanRes, cArgs.Stats, cArgs.EvalCtx.ClusterSettings())
Expand Down
77 changes: 59 additions & 18 deletions pkg/kv/kvserver/batcheval/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,26 @@ func readProvisionalVal(

}

// acquireLocksOnKeys acquires locks on each of the keys in the result of a
// {,Reverse}ScanRequest. The locks are held by the specified transaction with
// the supplied locks strength and durability. The list of LockAcquisitions is
// returned to the caller, which the caller must accumulate in its result set.
// acquireLocksOnKeys checks for conflicts, and if none are found, acquires
// locks on each of the keys in the result of a {,Reverse}ScanRequest. The
// acquired locks are held by the specified transaction[1] with the supplied
// lock strength and durability. The list of LockAcquisitions is returned to the
// caller, which the caller must accumulate in its result set.
//
// It is possible to run into a lock conflict error when trying to acquire a
// lock on one of the keys. In such cases, a LockConflictError is returned to
// Even though the function is called post evaluation, at which point requests
// have already sequenced with all locks in the in-memory lock table, there may
// still be (currently undiscovered) replicated locks. This is because the
// in-memory lock table only has a partial view of all locks for a range.
// Therefore, the first thing we do is check for replicated lock conflicts that
// may have been missed. If any are found, a LockConflictError is returned to
// the caller.
//
// [1] The caller is allowed to pass in a nil transaction; this means that
// acquireLocksOnKeys can be called on behalf of non-transactional requests.
// Non-transactional requests are not allowed to hold locks that outlive the
// lifespan of their request. As such, an empty list is returned for them.
// However, non-transactional requests do conflict with locks held by concurrent
// transactional requests, so they may return a LockConflictError.
func acquireLocksOnKeys(
ctx context.Context,
readWriter storage.ReadWriter,
Expand All @@ -125,32 +137,34 @@ func acquireLocksOnKeys(
ms *enginepb.MVCCStats,
settings *cluster.Settings,
) ([]roachpb.LockAcquisition, error) {
acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys)
acquiredLocks := make([]roachpb.LockAcquisition, 0, scanRes.NumKeys)
switch scanFmt {
case kvpb.BATCH_RESPONSE:
var i int
err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error {
k := copyKey(key.Key)
acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k, ms, settings)
if err != nil {
return err
}
acquiredLocks[i] = acq
i++
if !acq.Empty() {
acquiredLocks = append(acquiredLocks, acq)
}
return nil
})
if err != nil {
return nil, err
}
return acquiredLocks, nil
case kvpb.KEY_VALUES:
for i, row := range scanRes.KVs {
for _, row := range scanRes.KVs {
k := copyKey(row.Key)
acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k, ms, settings)
if err != nil {
return nil, err
}
acquiredLocks[i] = acq
if !acq.Empty() {
acquiredLocks = append(acquiredLocks, acq)
}
}
return acquiredLocks, nil
case kvpb.COL_BATCH_RESPONSE:
Expand All @@ -160,13 +174,26 @@ func acquireLocksOnKeys(
}
}

// acquireLockOnKey acquires a lock on the specified key. The lock is acquired
// by the specified transaction with the supplied lock strength and durability.
// The resultant lock acquisition struct is returned, which the caller must
// accumulate in its result set.
// acquireLockOnKey checks for conflicts, and if non are found, acquires a lock
// on the specified key. The lock is acquired by the specified transaction[1]
// with the supplied lock strength and durability. The resultant lock
// acquisition struct is returned, which the caller must accumulate in its
// result set.
//
// It is possible for lock acquisition to run into a lock conflict error, in
// which case a LockConflictError is returned to the caller.
// Even though the function is called post evaluation, at which point requests
// have already sequenced with all locks in the in-memory lock table, there may
// still be (currently undiscovered) replicated locks. This is because the
// in-memory lock table only has a partial view of all locks for a range.
// Therefore, the first thing we do is check for replicated lock conflicts that
// may have been missed. If any are found, a LockConflictError is returned to
// the caller.
//
// [1] The caller is allowed to pass in a nil transaction; this means that
// acquireLockOnKey can be called on behalf of non-transactional requests.
// Non-transactional requests are not allowed to hold locks that outlive the
// lifespan of their request. As such, an empty lock acquisition is returned for
// them. However, non-transactional requests do conflict with locks held by
// concurrent transactional requests, so they may return a LockConflictError.
func acquireLockOnKey(
ctx context.Context,
readWriter storage.ReadWriter,
Expand All @@ -178,6 +205,20 @@ func acquireLockOnKey(
settings *cluster.Settings,
) (roachpb.LockAcquisition, error) {
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&settings.SV)
if txn == nil {
// Non-transactional requests are not allowed to acquire locks that outlive
// the request's lifespan. However, they may conflict with locks held by
// other concurrent transactional requests. Evaluation up until this point
// has only scanned for (and not found any) conflicts with locks in the
// in-memory lock table. This includes all unreplicated locks and contended
// replicated locks. We haven't considered conflicts with un-contended
// replicated locks -- do so now.
//
// NB: The supplied durability is insignificant for non-transactional
// requests.
return roachpb.LockAcquisition{},
storage.MVCCCheckForAcquireLock(ctx, readWriter, txn, str, key, maxLockConflicts)
}
switch dur {
case lock.Unreplicated:
// Evaluation up until this point has only scanned for (and not found any)
Expand Down
221 changes: 221 additions & 0 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
Expand Down Expand Up @@ -4975,6 +4976,226 @@ func setupDBAndWriteAAndB(t *testing.T) (serverutils.TestServerInterface, *kv.DB
return s, db
}

// TestNonTransactionalLockingRequestsConflictWithReplicated locks ensures that
// non-transactional locking requests check for conflicts with replicated locks
// even though they cannot acquire locks that outlive their request.
//
// Regression test for https://github.com/cockroachdb/cockroach/issues/117628.
func TestNonTransactionalLockingRequestsConflictWithReplicatedLocks(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db := setupDBAndWriteAAndB(t)
defer s.Stopper().Stop(ctx)

keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")

var wg sync.WaitGroup
wg.Add(1)

locksHeld := make(chan struct{})
canReleaseLocks := make(chan struct{})

// Set up the test by acquiring a replicated exclusive lock on keyA and a
// replicated shared lock on KeyB. We'll hold these locks for the duration of
// the test.
go func() {
defer wg.Done()

txn1 := db.NewTxn(ctx, "txn1")
_, err := txn1.GetForUpdate(ctx, keyA, kvpb.GuaranteedDurability)
if err != nil {
t.Error(err)
}
_, err = txn1.GetForShare(ctx, keyB, kvpb.GuaranteedDurability)
if err != nil {
t.Error(err)
}

close(locksHeld)
<-canReleaseLocks // block for all test cases

err = txn1.Commit(ctx)
if err != nil {
t.Error(err)
}
}()

<-locksHeld

for i, tc := range []struct {
setup func(*kvpb.BatchRequest, bool)
expBlock bool
}{
// 1. Get requests.
// 1a. Exclusive locking.
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := getArgs(keyA)
req.KeyLockingStrength = lock.Exclusive
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: true,
},
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := getArgs(keyB)
req.KeyLockingStrength = lock.Exclusive
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: true,
},
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := getArgs(keyC)
req.KeyLockingStrength = lock.Exclusive
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: false,
},
// 1b. Shared locking.
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := getArgs(keyA)
req.KeyLockingStrength = lock.Shared
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: true,
},
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := getArgs(keyB)
req.KeyLockingStrength = lock.Shared
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: false,
},
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := getArgs(keyC)
req.KeyLockingStrength = lock.Shared
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: false,
},
// 2. Scan requests.
// 2a. Exclusive locking.
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := scanArgs(keyA, keyC)
req.KeyLockingStrength = lock.Exclusive
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: true,
},
// 2b. Shared locking.
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := scanArgs(keyA, keyC)
req.KeyLockingStrength = lock.Shared
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: true,
},
// 3. ReverseScan requests.
// 3a. Exclusive locking.
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := revScanArgs(keyA, keyC)
req.KeyLockingStrength = lock.Exclusive
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: true,
},
// 3b. Shared locking.
{
setup: func(ba *kvpb.BatchRequest, repl bool) {
dur := lock.Unreplicated
if repl {
dur = lock.Replicated
}
req := revScanArgs(keyA, keyC)
req.KeyLockingStrength = lock.Shared
req.KeyLockingDurability = dur
ba.Add(req)
},
expBlock: true,
},
} {
testutils.RunTrueAndFalse(t, "replicated", func(t *testing.T, repl bool) {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) {
ba := &kvpb.BatchRequest{}
// Having the request return an error instead of blocking on conflict
// makes the test easier.
ba.WaitPolicy = lock.WaitPolicy_Error
tc.setup(ba, repl)

store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
_, pErr := store.TestSender().Send(ctx, ba)

if tc.expBlock {
require.NotNil(t, pErr)
lcErr := new(kvpb.WriteIntentError)
require.True(t, errors.As(pErr.GoError(), &lcErr))
require.Equal(t, kvpb.WriteIntentError_REASON_WAIT_POLICY, lcErr.Reason)
} else {
require.Nil(t, pErr.GoError())
}
})
})
}

close(canReleaseLocks)
wg.Wait()
}

// TestSharedLocksBasic tests basic shared lock semantics. In particular, it
// tests multiple shared locks are compatible with each other, but exclusive
// locks aren't.
Expand Down

0 comments on commit c954900

Please sign in to comment.