From 3f897279d5fc5c853f9956984334a6506ebcc176 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Wed, 24 Jan 2024 17:06:59 -0500 Subject: [PATCH] kv: make non-txn'al locking reads check for replicated lock conflicts Non-transactional locking requests cannot acquire locks that outlive their request's lifespan. However, they do conflict with concurrent transactional locking requests. Previously, we would skip checking for conflicts with replicated locks entirely when dealing with non-transactional requests. This patch fixes that and adds a regression test. Closes #117628 Release note: None --- pkg/kv/kvserver/batcheval/cmd_get.go | 6 +- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 2 +- pkg/kv/kvserver/batcheval/cmd_scan.go | 2 +- pkg/kv/kvserver/batcheval/lock.go | 77 ++++-- pkg/kv/kvserver/client_replica_test.go | 221 ++++++++++++++++++ 5 files changed, 286 insertions(+), 22 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 6e8bf738f861..ed8ab144c376 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -92,13 +92,15 @@ func Get( } var res result.Result - if args.KeyLockingStrength != lock.None && h.Txn != nil && getRes.Value != nil { + if args.KeyLockingStrength != lock.None && getRes.Value != nil { acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLockingStrength, args.KeyLockingDurability, args.Key, cArgs.Stats, cArgs.EvalCtx.ClusterSettings()) if err != nil { return result.Result{}, err } - res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq} + if !acq.Empty() { + res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq} + } } res.Local.EncounteredIntents = intents return res, err diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 3ab728e14968..c5327b4658b0 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -119,7 +119,7 @@ func ReverseScan( } } - if args.KeyLockingStrength != lock.None && h.Txn != nil { + if args.KeyLockingStrength != lock.None { acquiredLocks, err := acquireLocksOnKeys( ctx, readWriter, h.Txn, args.KeyLockingStrength, args.KeyLockingDurability, args.ScanFormat, &scanRes, cArgs.Stats, cArgs.EvalCtx.ClusterSettings()) diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 262ca974c4e5..1e7c71501210 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -120,7 +120,7 @@ func Scan( } } - if args.KeyLockingStrength != lock.None && h.Txn != nil { + if args.KeyLockingStrength != lock.None { acquiredLocks, err := acquireLocksOnKeys( ctx, readWriter, h.Txn, args.KeyLockingStrength, args.KeyLockingDurability, args.ScanFormat, &scanRes, cArgs.Stats, cArgs.EvalCtx.ClusterSettings()) diff --git a/pkg/kv/kvserver/batcheval/lock.go b/pkg/kv/kvserver/batcheval/lock.go index 9c8f54745c62..d021fd51b48d 100644 --- a/pkg/kv/kvserver/batcheval/lock.go +++ b/pkg/kv/kvserver/batcheval/lock.go @@ -106,14 +106,26 @@ func readProvisionalVal( } -// acquireLocksOnKeys acquires locks on each of the keys in the result of a -// {,Reverse}ScanRequest. The locks are held by the specified transaction with -// the supplied locks strength and durability. The list of LockAcquisitions is -// returned to the caller, which the caller must accumulate in its result set. +// acquireLocksOnKeys checks for conflicts, and if none are found, acquires +// locks on each of the keys in the result of a {,Reverse}ScanRequest. The +// acquired locks are held by the specified transaction[1] with the supplied +// lock strength and durability. The list of LockAcquisitions is returned to the +// caller, which the caller must accumulate in its result set. // -// It is possible to run into a lock conflict error when trying to acquire a -// lock on one of the keys. In such cases, a LockConflictError is returned to +// Even though the function is called post evaluation, at which point requests +// have already sequenced with all locks in the in-memory lock table, there may +// still be (currently undiscovered) replicated locks. This is because the +// in-memory lock table only has a partial view of all locks for a range. +// Therefore, the first thing we do is check for replicated lock conflicts that +// may have been missed. If any are found, a LockConflictError is returned to // the caller. +// +// [1] The caller is allowed to pass in a nil transaction; this means that +// acquireLocksOnKeys can be called on behalf of non-transactional requests. +// Non-transactional requests are not allowed to hold locks that outlive the +// lifespan of their request. As such, an empty list is returned for them. +// However, non-transactional requests do conflict with locks held by concurrent +// transactional requests, so they may return a LockConflictError. func acquireLocksOnKeys( ctx context.Context, readWriter storage.ReadWriter, @@ -125,18 +137,18 @@ func acquireLocksOnKeys( ms *enginepb.MVCCStats, settings *cluster.Settings, ) ([]roachpb.LockAcquisition, error) { - acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys) + acquiredLocks := make([]roachpb.LockAcquisition, 0, scanRes.NumKeys) switch scanFmt { case kvpb.BATCH_RESPONSE: - var i int err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { k := copyKey(key.Key) acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k, ms, settings) if err != nil { return err } - acquiredLocks[i] = acq - i++ + if !acq.Empty() { + acquiredLocks = append(acquiredLocks, acq) + } return nil }) if err != nil { @@ -144,13 +156,15 @@ func acquireLocksOnKeys( } return acquiredLocks, nil case kvpb.KEY_VALUES: - for i, row := range scanRes.KVs { + for _, row := range scanRes.KVs { k := copyKey(row.Key) acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k, ms, settings) if err != nil { return nil, err } - acquiredLocks[i] = acq + if !acq.Empty() { + acquiredLocks = append(acquiredLocks, acq) + } } return acquiredLocks, nil case kvpb.COL_BATCH_RESPONSE: @@ -160,13 +174,26 @@ func acquireLocksOnKeys( } } -// acquireLockOnKey acquires a lock on the specified key. The lock is acquired -// by the specified transaction with the supplied lock strength and durability. -// The resultant lock acquisition struct is returned, which the caller must -// accumulate in its result set. +// acquireLockOnKey checks for conflicts, and if non are found, acquires a lock +// on the specified key. The lock is acquired by the specified transaction[1] +// with the supplied lock strength and durability. The resultant lock +// acquisition struct is returned, which the caller must accumulate in its +// result set. // -// It is possible for lock acquisition to run into a lock conflict error, in -// which case a LockConflictError is returned to the caller. +// Even though the function is called post evaluation, at which point requests +// have already sequenced with all locks in the in-memory lock table, there may +// still be (currently undiscovered) replicated locks. This is because the +// in-memory lock table only has a partial view of all locks for a range. +// Therefore, the first thing we do is check for replicated lock conflicts that +// may have been missed. If any are found, a LockConflictError is returned to +// the caller. +// +// [1] The caller is allowed to pass in a nil transaction; this means that +// acquireLockOnKey can be called on behalf of non-transactional requests. +// Non-transactional requests are not allowed to hold locks that outlive the +// lifespan of their request. As such, an empty lock acquisition is returned for +// them. However, non-transactional requests do conflict with locks held by +// concurrent transactional requests, so they may return a LockConflictError. func acquireLockOnKey( ctx context.Context, readWriter storage.ReadWriter, @@ -178,6 +205,20 @@ func acquireLockOnKey( settings *cluster.Settings, ) (roachpb.LockAcquisition, error) { maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&settings.SV) + if txn == nil { + // Non-transactional requests are not allowed to acquire locks that outlive + // the request's lifespan. However, they may conflict with locks held by + // other concurrent transactional requests. Evaluation up until this point + // has only scanned for (and not found any) conflicts with locks in the + // in-memory lock table. This includes all unreplicated locks and contended + // replicated locks. We haven't considered conflicts with un-contended + // replicated locks -- do so now. + // + // NB: The supplied durability is insignificant for non-transactional + // requests. + return roachpb.LockAcquisition{}, + storage.MVCCCheckForAcquireLock(ctx, readWriter, txn, str, key, maxLockConflicts) + } switch dur { case lock.Unreplicated: // Evaluation up until this point has only scanned for (and not found any) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index a47cf5e46b68..f2c360023c63 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -4975,6 +4976,226 @@ func setupDBAndWriteAAndB(t *testing.T) (serverutils.TestServerInterface, *kv.DB return s, db } +// TestNonTransactionalLockingRequestsConflictWithReplicated locks ensures that +// non-transactional locking requests check for conflicts with replicated locks +// even though they cannot acquire locks that outlive their request. +// +// Regression test for https://github.com/cockroachdb/cockroach/issues/117628. +func TestNonTransactionalLockingRequestsConflictWithReplicatedLocks(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db := setupDBAndWriteAAndB(t) + defer s.Stopper().Stop(ctx) + + keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") + + var wg sync.WaitGroup + wg.Add(1) + + locksHeld := make(chan struct{}) + canReleaseLocks := make(chan struct{}) + + // Set up the test by acquiring a replicated exclusive lock on keyA and a + // replicated shared lock on KeyB. We'll hold these locks for the duration of + // the test. + go func() { + defer wg.Done() + + txn1 := db.NewTxn(ctx, "txn1") + _, err := txn1.GetForUpdate(ctx, keyA, kvpb.GuaranteedDurability) + if err != nil { + t.Error(err) + } + _, err = txn1.GetForShare(ctx, keyB, kvpb.GuaranteedDurability) + if err != nil { + t.Error(err) + } + + close(locksHeld) + <-canReleaseLocks // block for all test cases + + err = txn1.Commit(ctx) + if err != nil { + t.Error(err) + } + }() + + <-locksHeld + + for i, tc := range []struct { + setup func(*kvpb.BatchRequest, bool) + expBlock bool + }{ + // 1. Get requests. + // 1a. Exclusive locking. + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := getArgs(keyA) + req.KeyLockingStrength = lock.Exclusive + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: true, + }, + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := getArgs(keyB) + req.KeyLockingStrength = lock.Exclusive + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: true, + }, + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := getArgs(keyC) + req.KeyLockingStrength = lock.Exclusive + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: false, + }, + // 1b. Shared locking. + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := getArgs(keyA) + req.KeyLockingStrength = lock.Shared + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: true, + }, + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := getArgs(keyB) + req.KeyLockingStrength = lock.Shared + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: false, + }, + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := getArgs(keyC) + req.KeyLockingStrength = lock.Shared + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: false, + }, + // 2. Scan requests. + // 2a. Exclusive locking. + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := scanArgs(keyA, keyC) + req.KeyLockingStrength = lock.Exclusive + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: true, + }, + // 2b. Shared locking. + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := scanArgs(keyA, keyC) + req.KeyLockingStrength = lock.Shared + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: true, + }, + // 3. ReverseScan requests. + // 3a. Exclusive locking. + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := revScanArgs(keyA, keyC) + req.KeyLockingStrength = lock.Exclusive + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: true, + }, + // 3b. Shared locking. + { + setup: func(ba *kvpb.BatchRequest, repl bool) { + dur := lock.Unreplicated + if repl { + dur = lock.Replicated + } + req := revScanArgs(keyA, keyC) + req.KeyLockingStrength = lock.Shared + req.KeyLockingDurability = dur + ba.Add(req) + }, + expBlock: true, + }, + } { + testutils.RunTrueAndFalse(t, "replicated", func(t *testing.T, repl bool) { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + ba := &kvpb.BatchRequest{} + // Having the request return an error instead of blocking on conflict + // makes the test easier. + ba.WaitPolicy = lock.WaitPolicy_Error + tc.setup(ba, repl) + + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + _, pErr := store.TestSender().Send(ctx, ba) + + if tc.expBlock { + require.NotNil(t, pErr) + lcErr := new(kvpb.WriteIntentError) + require.True(t, errors.As(pErr.GoError(), &lcErr)) + require.Equal(t, kvpb.WriteIntentError_REASON_WAIT_POLICY, lcErr.Reason) + } else { + require.Nil(t, pErr.GoError()) + } + }) + }) + } + + close(canReleaseLocks) + wg.Wait() +} + // TestSharedLocksBasic tests basic shared lock semantics. In particular, it // tests multiple shared locks are compatible with each other, but exclusive // locks aren't.